python类full_path()的实例源码

__init__.py 文件源码 项目:Alexandria 作者: indrora 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def needs_authentication():
    """
    Decorator: Attach this to a route and it will require that the session has been
    previously authenticated.
    """
    def auth_chk_wrapper(f):
        # This is our decoratorated function wrapper
        @wraps(f)
        def deco(*args, **kwargs):
            # If the session does not yet have an authentication 
            if not is_authenticated():
                return redirect(sign_auth_path(request.full_path))
            else:
                return f(*args, **kwargs)
        return deco
    return auth_chk_wrapper
__init__.py 文件源码 项目:alexandria 作者: LibraryBox-Dev 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def needs_authentication():
    """
    Decorator: Attach this to a route and it will require that the session has been
    previously authenticated.
    """
    def auth_chk_wrapper(f):
        # This is our decoratorated function wrapper
        @wraps(f)
        def deco(*args, **kwargs):
            # If the session does not yet have an authentication 
            if not is_authenticated():
                return redirect(sign_auth_path(request.full_path))
            else:
                return f(*args, **kwargs)
        return deco
    return auth_chk_wrapper
api.py 文件源码 项目:enjoliver 作者: JulienBalestra 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def metadata():
    """
    Metadata
    ---
    tags:
      - matchbox
    responses:
      200:
        description: Metadata of the current group/profile
        schema:
            type: string
    """
    matchbox_uri = application.config.get("MATCHBOX_URI")
    if matchbox_uri:
        matchbox_resp = requests.get("%s%s" % (matchbox_uri, request.full_path))
        resp = matchbox_resp.content
        matchbox_resp.close()
        return Response(resp, status=matchbox_resp.status_code, mimetype="text/plain")

    return Response("matchbox=%s" % matchbox_uri, status=403, mimetype="text/plain")
cache.py 文件源码 项目:MagicPress 作者: huang-zp 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def cached(timeout=5 * 60, key='blog_view_%s'):
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            cache_key = key % request.full_path
            if request.method == 'POST':
                value = f(*args, **kwargs)
                cache.set(cache_key, value, timeout=timeout)
                return value
            value = cache.get(cache_key)
            if value is None:
                value = f(*args, **kwargs)
                cache.set(cache_key, value, timeout=timeout)
            return value
        return decorated_function
    return decorator
Dr0p1t_Server.py 文件源码 项目:Dr0p1t-Framework 作者: Exploit-install 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def after_request(response):
    timestamp = strftime('[%Y-%b-%d %H:%M]')
    f = open("server.log","a").write( "\n"+"--"*10+"\n"+'%s %s %s %s %s %s'%(timestamp, request.remote_addr, request.method, request.scheme, request.full_path, response.status) )
    return response
Dr0p1t_Server.py 文件源码 项目:Dr0p1t-Framework 作者: Exploit-install 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def exceptions(e):
    tb = traceback.format_exc()
    timestamp = strftime('[%Y-%b-%d %H:%M]')
    f = open("server.log","a").write( "\n"+"--"*10+"\n"+'%s %s %s %s %s 5xx INTERNAL SERVER ERROR\n%s'%(timestamp, request.remote_addr, request.method, request.scheme, request.full_path, tb) )
    return abort(500)
web.py 文件源码 项目:automatron 作者: madflojo 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def before_request():
    ''' Pre-request hander '''
    logger.debug("Incoming Web Request: {0}".format(request.full_path))
    g.dbc = connect_db(app.config)
__init__.py 文件源码 项目:BookCloud 作者: livro-aberto 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def internal_server_error(e):
    message = repr(e)
    trace = traceback.format_exc()
    trace = string.split(trace, '\n')
    timestamp = (datetime.fromtimestamp(time.time())
                 .strftime('%Y-%m-%d %H:%M:%S'))
    if current_user.is_authenticated:
        user = current_user.username
    else:
        user = 'anonymous'
    gathered_data = ('message: {}\n\n\n'
                     'timestamp: {}\n'
                     'ip: {}\n'
                     'method: {}\n'
                     'request.scheme: {}\n'
                     'request.full_path: {}\n'
                     'user: {}\n\n\n'
                     'trace: {}'.format(message, timestamp,
                                        request.remote_addr, request.method,
                                        request.scheme, request.full_path,
                                        user, '\n'.join(trace)))
    # send email to admin
    if app.config['TESTING']:
        print(gathered_data)
    else:
        mail_message = gathered_data
        msg = Message('Error: ' + message[:40],
                      body=mail_message,
                      recipients=[app.config['ADMIN_MAIL']])
        mail.send(msg)
        flash(_('A message has been sent to the administrator'), 'info')
    bookcloud_before_request()
    return render_template('500.html', message=gathered_data), 500
view.py 文件源码 项目:osm-wikidata 作者: EdwardBetts 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def save_timing(name, t0):
    timing = Timing(start=t0,
                    path=request.full_path,
                    name=name,
                    seconds=time() - t0)
    database.session.add(timing)
controller.py 文件源码 项目:restful-api 作者: TeamGhostBuster 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def add_comment(user, article_id):
    """
    @api {post} /article/:id/comment Post comment to an article
    @apiName Post comment to an article
    @apiGroup Comment

    @apiUse AuthorizationTokenHeader

    @apiParam {String} id Article unique ID.
    @apiParam {String} comment The comment to be added
    @apiParam {Boolean} public The privacy setting for the comment, default is True
    @apiParamExample Request (Example)
        {
            "comment": "I hate you",
            "public": "false"
        }

    @apiUse UnauthorizedAccessError
    @apiUse ResourceDoesNotExist
    @apiUse BadRequest
    """
    app.logger.info('User {} Access {}'.format(user, request.full_path))

    # Get request body
    req = RequestUtil.get_request()
    comment = req.get('comment')
    public = req.get('public')

    # Add comment
    result = MongoUtil.add_comment(user, article_id, comment, public)

    # If error occurs
    if isinstance(result, str):
        return ResponseUtil.error_response(result)

    app.logger.info('User {} Create comment {}'.format(user, result.id))
    return jsonify(JsonUtil.serialize(result))
browser.py 文件源码 项目:Alexandria 作者: indrora 项目源码 文件源码 阅读 51 收藏 0 点赞 0 评论 0
def upload(path=""):
    """

    Uploads a file to a place.
    """
    # Check that uploads are OK
    if not can_upload():
        return redirect(sign_auth_path(request.full_path))
    if request.method=='POST':
        # handle uploaded files
        if not 'file' in request.files:
            abort(400) # General UA error
        file = request.files["file"]
        # We default to using the name of the file provided,
        # but allow the filename to be changed via POST details.
        fname = file.filename
        if 'name' in request.form:
            fname = request.form['name']
        safe_fname = secure_filename(fname)
        if not allowed_filename(safe_fname):
            abort(400) # General client error
        # We're handling a potentially dangerous path, better run it through
        # The flask path jointer.
        basepath=app.config.get("storage", "location")
        fullpath = safe_join(basepath, path)
        file.save(os.path.join(fullpath,fname))
        flash("File uploaded successfully")
        return redirect(url_for('browser.upload',path=path))
    else:
        return render_template("browser/upload.html",path=path,title="Upload Files")
tor_cache.py 文件源码 项目:freshonions-torscraper 作者: dirtyfilthy 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __call__(self, f):
        @functools.wraps(f)
        def my_decorator(*args, **kwargs):
            global _is_cached
            _is_cached = True
            if _cache is None:
                return f(*args, **kwargs)
            response = _cache.get(request.full_path)
            if response is None:
                response = f(*args, **kwargs)
                _cache.set(request.path, response, self.timeout)
            _is_cached = False

            if self.render_layout:
                wrapped_response = make_response("%s%s%s" % (render_template("layout_header.html"), response, render_template("layout_footer.html")))
                if is_response(response):
                    wrapped_response.status_code = response.status_code
                    wrapped_response.headers     = response.headers
                    wrapped_response.status      = response.status
                    wrapped_response.mimetype    = response.mimetype
                return wrapped_response
            else:
                return response

        functools.update_wrapper(my_decorator, f)
        return my_decorator
app.py 文件源码 项目:freshonions-torscraper 作者: dirtyfilthy 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def setup_session():

    session.permanent = True
    app.permanent_session_lifetime = timedelta(days=365*30)
    if not 'uuid' in session:
        session['uuid'] = str(uuid.uuid4())
        g.uuid_is_fresh = True
    else:
        g.uuid_is_fresh = False
    now = datetime.now()

    referrer  = request.headers.get('Referer', '')
    path      = request.path
    full_path = request.full_path
    agent     = request.headers.get('User-Agent', '')

    if agent in BLACKLIST_AGENT or len(agent) < 15:
        g.request_log_id = 0
        return render_template('error.html',code=200,message="Layer 8 error. If you want my data, DON'T SCRAPE (too much cpu load), contact me and I will give it to you"), 200

    with db_session:
        req_log   = RequestLog( uuid=session['uuid'], 
                                uuid_is_fresh=g.uuid_is_fresh, 
                                created_at=now, 
                                agent=agent,
                                referrer=referrer,
                                path=path,
                                full_path=full_path)
        flush()
        g.request_log_id = req_log.id
auth.py 文件源码 项目:opennms_alarmforwarder 作者: NETHINKS 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def login_required(func):
        """decorator for login required
        This decorator can be used for declaring a function
        which requires authentication
        """
        @wraps(func)
        def handle_login_required(*args, **kwargs):
            # try to get username from session
            try:
                username = session["username"]
                return func(*args, **kwargs)
            except:
                pass

            # allow HTTP basic auth
            try:
                user_basic = request.authorization.username
                password_basic = request.authorization.password
                authprovider = security.AuthenticationProvider.get_authprovider()
                if authprovider.authenticate(user_basic, password_basic):
                    return func(*args, **kwargs)
            except:
                pass

            # on error: redirect to login page or return HTTP/401
            if json_check():
                return json_error("Unauthenticated", 401)
            session["redirect"] = request.full_path
            return redirect("/login")
        return handle_login_required
feedback.py 文件源码 项目:eq-survey-runner 作者: ONSdigital 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def before_request():
    logger.info('feedback request', url_path=request.full_path)

    metadata = get_metadata(current_user)
    if metadata:
        logger.bind(tx_id=metadata['tx_id'])
        g.schema_json = load_schema_from_metadata(metadata)
questionnaire.py 文件源码 项目:eq-survey-runner 作者: ONSdigital 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def before_request():
    metadata = get_metadata(current_user)
    if metadata:
        logger.bind(tx_id=metadata['tx_id'])

    values = request.view_args
    logger.bind(eq_id=values['eq_id'], form_type=values['form_type'],
                ce_id=values['collection_id'])
    logger.info('questionnaire request', method=request.method, url_path=request.full_path)

    if metadata:
        g.schema_json = load_schema_from_metadata(metadata)
        _check_same_survey(values['eq_id'], values['form_type'], values['collection_id'])
browser.py 文件源码 项目:alexandria 作者: LibraryBox-Dev 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def upload(path=""):
    """

    Uploads a file to a place.
    """
    # Check that uploads are OK
    if not can_upload():
        return redirect(sign_auth_path(request.full_path))
    if request.method=='POST':
        # handle uploaded files
        if not 'file' in request.files:
            abort(400) # General UA error
        file = request.files["file"]
        # We default to using the name of the file provided,
        # but allow the filename to be changed via POST details.
        fname = file.filename
        if 'name' in request.form:
            fname = request.form['name']
        safe_fname = secure_filename(fname)
        if not allowed_filename(safe_fname):
            abort(400) # General client error
        # We're handling a potentially dangerous path, better run it through
        # The flask path jointer.
        basepath=app.config.get("storage", "location")
        fullpath = safe_join(basepath, path)
        file.save(os.path.join(fullpath,fname))
        flash("File uploaded successfully")
        return redirect(url_for('browser.upload',path=path))
    else:
        return render_template("browser/upload.html",path=path,title="Upload Files")
mmwatch.py 文件源码 项目:mmwatch 作者: Zverik 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def cached(timeout=1 * 60, key='view'):
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            cache_key = '{}/{}'.format(key, request.full_path)
            rv = cache.get(cache_key)
            if rv is not None:
                return rv
            rv = f(*args, **kwargs)
            cache.set(cache_key, rv, timeout=timeout)
            return rv
        return decorated_function
    return decorator
models.py 文件源码 项目:knowledge-repo 作者: airbnb 项目源码 文件源码 阅读 55 收藏 0 点赞 0 评论 0
def __call__(self, *args, **kwargs):
            if not current_app.config.get('INDEXING_ENABLED', True):
                return self._route(*args, **kwargs)

            log = PageView(
                page=request.full_path,
                endpoint=request.endpoint,
                user_id=current_user.id,
                ip_address=request.remote_addr,
                version=__version__
            )
            errorlog = None
            log.object_id, log.object_type, log.object_action, reextract_after_request = self.extract_objects(*args, **kwargs)
            db_session.add(log)  # Add log here to ensure pageviews are accurate

            try:
                return self._route(*args, **kwargs)
            except Exception as e:
                db_session.rollback()  # Ensure no lingering database changes remain after crashed route
                db_session.add(log)
                errorlog = ErrorLog.from_exception(e)
                db_session.add(errorlog)
                db_session.commit()
                raise_with_traceback(e)
            finally:
                # Extract object id and type after response generated (if requested) to ensure
                # most recent data is collected
                if reextract_after_request:
                    log.object_id, log.object_type, log.object_action, _ = self.extract_objects(*args, **kwargs)

                if errorlog is not None:
                    log.id_errorlog = errorlog.id
                db_session.add(log)
                db_session.commit()
__init__.py 文件源码 项目:PyHub 作者: 521xueweihan 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def after_request(response):
    logger.info('%s %s %s %s %s', request.remote_addr, request.method,
                request.scheme, request.full_path, response.status)
    return response
__init__.py 文件源码 项目:PyHub 作者: 521xueweihan 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def exceptions(e):
    tb = traceback.format_exc()
    logger.error('%s %s %s %s %s 5xx INTERNAL SERVER ERROR\n%s',
        request.remote_addr, request.method,
        request.scheme, request.full_path, tb)
    return e.status_code
__init__.py 文件源码 项目:hellogithub.com 作者: 521xueweihan 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def after_request(response):
    logger.info('%s %s %s %s %s', request.method,
                request.environ.get('HTTP_X_REAL_IP', request.remote_addr),
                request.scheme, request.full_path, response.status)
    return response
__init__.py 文件源码 项目:hellogithub.com 作者: 521xueweihan 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def exceptions(e):
    tb = traceback.format_exc()
    tb = tb.decode('utf-8')
    logger.error('%s %s %s %s 5xx INTERNAL SERVER ERROR\n%s',
                 request.environ.get('HTTP_X_REAL_IP', request.remote_addr),
                 request.method, request.scheme, request.full_path, tb)
    return '500 INTERNAL SERVER ERROR', 500
intranet.py 文件源码 项目:extranet 作者: demaisj 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _before_request(self):
        if self._has_token_checker is None or self.token_view is None:
            return

        if request.endpoint != self.token_view and request.endpoint not in self.token_view_overrides and self._has_token_checker() is not None and not self._has_token_checker():
            return redirect(url_for(self.token_view, next=request.full_path))
app.py 文件源码 项目:shorty 作者: PadamSethia 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def after_request(response):
    timestamp = strftime('[%Y-%b-%d %H:%M]')
    logger.error('%s %s %s %s %s %s',timestamp , request.remote_addr , \
                request.method , request.scheme , request.full_path , response.status)
    return response
app.py 文件源码 项目:shorty 作者: PadamSethia 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def exceptions(e):
    tb = traceback.format_exc()
    timestamp = strftime('[%Y-%b-%d %H:%M]')
    logger.error('%s %s %s %s %s 5xx INTERNAL SERVER ERROR\n%s',
        timestamp, request.remote_addr, request.method,
        request.scheme, request.full_path, tb)
    return make_response(e , 405)
api.py 文件源码 项目:enjoliver 作者: JulienBalestra 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def ipxe():
    """
    iPXE
    ---
    tags:
      - matchbox
    responses:
      200:
        description: iPXE script
        schema:
            type: string
      404:
        description: Not valid
        schema:
            type: string
    """
    app.logger.info("%s %s" % (request.method, request.url))
    try:
        matchbox_resp = requests.get(
            "%s%s" % (
                app.config["MATCHBOX_URI"],
                request.full_path))
        matchbox_resp.close()
        response = matchbox_resp.content.decode()

        mac = request.args.get("mac")
        if mac:
            repositories.machine_state.update(mac.replace("-", ":"), MachineStates.booting)

        return Response(response, status=200, mimetype="text/plain")

    except requests.exceptions.ConnectionError:
        app.logger.warning("404 for /ipxe")
        return "404", 404
repo.py 文件源码 项目:Penny-Dreadful-Tools 作者: PennyDreadfulMTG 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def create_issue(content, author, location='Discord', repo='PennyDreadfulMTG/Penny-Dreadful-Tools'):
    if content is None or content == '':
        return None
    body = ''
    if '\n' in content:
        title, body = content.split('\n', 1)
        body += '\n\n'
    else:
        title = content
    body += 'Reported on {location} by {author}'.format(location=location, author=author)
    if request:
        body += textwrap.dedent("""
            --------------------------------------------------------------------------------
            Request Method: {method}
            Path: {full_path}
            Cookies: {cookies}
            Endpoint: {endpoint}
            View Args: {view_args}
            Person: {id}
            User-Agent: {user_agent}
            Referrer: {referrer}
        """.format(method=request.method, full_path=request.full_path, cookies=request.cookies, endpoint=request.endpoint, view_args=request.view_args, id=session.get('id', 'logged_out'), user_agent=request.headers.get('User-Agent'), referrer=request.referrer))
    print(title + '\n' + body)
    # Only check for github details at the last second to get log output even if github not configured.
    if not configuration.get('github_user') or not configuration.get('github_password'):
        return None
    g = Github(configuration.get('github_user'), configuration.get('github_password'))
    repo = g.get_repo(repo)
    issue = repo.create_issue(title=title, body=body)
    return issue
initialize.py 文件源码 项目:SynDBB 作者: D2K5 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def after_request(response):
        timestamp = strftime('%Y-%b-%d %H:%M:%S')
        useragent = request.user_agent
        logger.info('[%s] [%s] [%s] [%s] [%s] [%s] [%s] [%s]',
            timestamp, request.remote_addr, useragent, request.method,
            request.scheme, request.full_path, response.status, request.referrer)
        return response
initialize.py 文件源码 项目:SynDBB 作者: D2K5 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def exceptions(e):
        tb = traceback.format_exc()
        timestamp = strftime('[%Y-%b-%d %H:%M]')
        logger.error('%s %s %s %s %s 5xx INTERNAL SERVER ERROR\n%s',
            timestamp, request.remote_addr, request.method,
            request.scheme, request.full_path, tb)
        return e.status_code

#Run the main app...


问题


面经


文章

微信
公众号

扫码关注公众号