python类endpoint()的实例源码

pagination.py 文件源码 项目:cookiecutter-flask-restful 作者: karec 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def paginate(query, schema):
    page = request.args.get('page', DEFAULT_PAGE_NUMBER)
    per_page = request.args.get('page_size', DEFAULT_PAGE_SIZE)
    page_obj = query.paginate(page=page, per_page=per_page)
    next = url_for(
        request.endpoint,
        page=page_obj.next_num if page_obj.has_next else page_obj.page,
        per_page=per_page,
        **request.view_args
    )
    prev = url_for(
        request.endpoint,
        page=page_obj.prev_num if page_obj.has_prev else page_obj.page,
        per_page=per_page,
        **request.view_args
    )

    return {
        'total': page_obj.total,
        'pages': page_obj.pages,
        'next': next,
        'prev': prev,
        'results': schema.dump(page_obj.items).data
    }
audit.py 文件源码 项目:microcosm-flask 作者: globality-corp 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, options, func, request_context):
        self.options = options
        self.operation = request.endpoint
        self.func = func.__name__
        self.method = request.method
        self.args = request.args
        self.view_args = request.view_args
        self.request_context = request_context
        self.timing = dict()

        self.error = None
        self.stack_trace = None
        self.request_body = None
        self.response_body = None
        self.response_headers = None
        self.status_code = None
        self.success = None
decorators.py 文件源码 项目:do-portal 作者: certeu 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def api_deprecated(new_endpoint, message='This endpoint is deprecated.'):
    """Decorator that adds a deprecation message for and endpoint.
    Decorated function will not be executed.

    :param new_endpoint: New endpoint to use
    :param message: Warning message
    :return:
    :rtype: func
    """
    def decorator(f):
        @functools.wraps(f)
        def wrapped(*args, **kwargs):
            response = jsonify({
                'message': message,
                'endpoint': url_for(new_endpoint, _external=True)
            })
            # response = jsonify(rv)
            response.status_code = 301
            response.headers['DO-New-Endpoint'] = \
                url_for(new_endpoint, _external=True)
            return response
        return wrapped
    return decorator
tenant_manager.py 文件源码 项目:os-reststack-manager 作者: gemagomez 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def authenticate():
    # logger.debug("endpoint request: %s" % request.endpoint)
    if re.search('tenant_provisioned', str(request.endpoint)):
        g.user = "phone_home"
        logger.info("Authentication bypassed: tenant_provisioned")
        return

    try:
        decoded = jwt.decode(request.headers['X-Auth-Token'], credentials['tenant_secret'], algorithms=['HS256'])
        g.user = decoded['user']
    except KeyError:
        logger.error("Error: key error.")
        abort(401)
    except jwt.DecodeError:
        logger.error("Error: decode error")
        abort(401)
app.py 文件源码 项目:FastIR_Server 作者: SekoiaLab 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def before_request():
    if request.endpoint != 'root':
        if request.method != 'POST':
            response = generate_response(
                **{
                    'return': 'KO',
                    'data': 'Unsupported HTTP method'
                }
            )
            return response, 400
        elif request.form.get('APIKey') != Master_APIKey:
            response = generate_response(
                **{
                    'return': 'KO',
                    'data': 'Bad API key'
                }
            )
            return response, 403
api_v1.py 文件源码 项目:fabric8-analytics-server 作者: fabric8-analytics 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def error():
    """This endpoint is used by httpd, which redirects its errors to it."""
    try:
        status = int(request.environ['REDIRECT_STATUS'])
    except Exception:
        # if there's an exception, it means that a client accessed this directly;
        #  in this case, we want to make it look like the endpoint is not here
        return api_404_handler()
    msg = 'Unknown error'
    # for now, we just provide specific error for stuff that already happened;
    #  before adding more, I'd like to see them actually happening with reproducers
    if status == 401:
        msg = 'Authentication failed'
    elif status == 405:
        msg = 'Method not allowed for this endpoint'
    raise HTTPError(status, msg)
api_v1.py 文件源码 项目:fabric8-analytics-server 作者: fabric8-analytics 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def add_resource_no_matter_slashes(resource, route, endpoint=None, defaults=None):
    """Adds a resource for both trailing slash and no trailing slash to prevent redirects.
    """
    slashless = route.rstrip('/')
    _resource_paths.append(api_v1.url_prefix + slashless)
    slashful = route + '/'
    endpoint = endpoint or resource.__name__.lower()
    defaults = defaults or {}

    rest_api_v1.add_resource(resource,
                             slashless,
                             endpoint=endpoint + '__slashless',
                             defaults=defaults)
    rest_api_v1.add_resource(resource,
                             slashful,
                             endpoint=endpoint + '__slashful',
                             defaults=defaults)
api_v1.py 文件源码 项目:fabric8-analytics-server 作者: fabric8-analytics 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def stack_analyses_debug(external_request_id):
    """Debug endpoint exposing operational data for particular stack analysis.

    This endpoint is not part of the public API.

    Note the existence of the data is not guaranteed,
    therefore the endpoint can return 404 even for valid request IDs.
    """

    results = retrieve_worker_results(rdb, external_request_id)
    if not results:
        return jsonify(error='No operational data for the request ID'), 404

    response = {'tasks': []}
    for result in results:
        op_data = result.to_dict()
        audit = op_data.get('task_result', {}).get('_audit', {})
        task_data = {'task_name': op_data.get('worker')}
        task_data['started_at'] = audit.get('started_at')
        task_data['ended_at'] = audit.get('ended_at')
        task_data['error'] = op_data.get('error')
        response['tasks'].append(task_data)
    return jsonify(response), 200
view.py 文件源码 项目:hreftoday 作者: soasme 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def templated(template=None):
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            template_name = template
            if template_name is None:
                template_name = request.endpoint \
                    .replace('.', '/') + '.html'
            ctx = f(*args, **kwargs)
            if ctx is None:
                ctx = {}
            elif not isinstance(ctx, dict):
                return ctx
            return render_template(template_name, **ctx)
        return decorated_function
    return decorator
flask_paginate.py 文件源码 项目:lalascan 作者: blackye 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def init_values(self):
        current_total = self.found if self.search else self.total
        pages = divmod(current_total, self.per_page)
        self.total_pages = pages[0] + 1 if pages[1] else pages[0]
        self.has_prev = self.page > 1
        self.has_next = self.page < self.total_pages

        args = request.args.copy()
        args.update(request.view_args.copy())
        self.args = {}
        for k, v in args.lists():
            if len(v) == 1:
                self.args[k] = v[0]
            else:
                self.args[k] = v

        self.endpoint = request.endpoint
credential.py 文件源码 项目:railgun 作者: xin-xinhanggao 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def should_update_email():
    """Check whether the current user has not set his or her email address.

    Some authentication providers may not give the email addresses.
    However, Railgun relies on email addresss to finish login process.
    So the users from these providers will be given `fake` email addresses.

    :class:`should_update_email` should be called before a view is executed,
    check whether the user should be redirected to
    :func:`~railgun.website.views.profile_edit` to fill in his or her
    email address.

    :return: :data:`True` if the user has a fake email address, :data:`False`
        otherwise.

    .. note::

        If the current view is :func:`~railgun.website.views.profile_edit`,
        this method will return :data:`False`.
    """
    if (current_user.email.endswith(app.config['EXAMPLE_USER_EMAIL_SUFFIX'])
            and request.endpoint != 'profile_edit'):
        return True
navibar.py 文件源码 项目:railgun 作者: xin-xinhanggao 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def make_view(title, endpoint, *args, **kwargs):
        """A shortcut to make a :class:`NaviItem` linking to a standard
        Flask view.  Equal to the following statement:

        .. code-block:: python

            NaviItem(
                title=title,
                url=lambda: url_for(endpoint, *args, **kwargs),
                identity=endpoint
            )

        :param title: The title of new :class:`NaviItem`
        :param endpoint: The endpoint of target view.
        :type endpoint: :class:`str`
        :param args: The unnamed arguments to :func:`flask.url_for` when
            generating the `url`.
        :param kwargs: The named arguments to :func:`flask.url_for` when
            generating the `url`.
        """
        return NaviItem(title, lambda: url_for(endpoint, *args, **kwargs),
                        endpoint)
monkey.py 文件源码 项目:kael 作者: 360skyeye 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def patch_validate_handler(name, bl=None):
    def params_validate_handler():
        if not request.endpoint \
                or not request.endpoint.startswith('{0}.'.format(name)) \
                or request.method == 'HEAD':
            return
        _, blueprint, endpoint = request.endpoint.split('.')
        mapping = get_mapping(blueprint, endpoint)
        if mapping:
            params = request.get_json()
            if validate(params, mapping, format_checker=formatchecker):
                raise ArgumentError('Json schema validate failed')

    if bl:
        bl.before_app_first_request(params_validate_handler)
    else:
        sys.modules['flask'].app.before_app_first_request(params_validate_handler)

    return params_validate_handler
ratelimit.py 文件源码 项目:ColdCore 作者: IceCTF 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def ratelimit(limit, per=300, send_x_headers=True,
              methods=["POST"],
              over_limit=on_over_limit,
              scope_func=scope_func,
              key_func=lambda: request.endpoint):
    def decorator(f):
        def rate_limited(*args, **kwargs):
            if request.method in methods:
                key = 'rate-limit/%s/%s/' % (key_func(), scope_func())
                rlimit = RateLimit(key, limit, per, send_x_headers)
                g._view_rate_limit = rlimit
                if over_limit is not None and rlimit.over_limit:
                    return over_limit(rlimit)
            return f(*args, **kwargs)
        return update_wrapper(rate_limited, f)
    return decorator
__init__.py 文件源码 项目:go_basketball 作者: ZhaoPengkun 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def before_request():
    request.start_time = datetime.now()


# @bp.after_request
# def after_request(resp):
#     try:
#         if '_' in request.endpoint:
#             dbcon = influx_db.connection
#             point = [{"measurement": config.APP_NAME,
#                       "tags": {"method": request.method, "status": resp.status_code, "endpoint":
#                           request.endpoint},
#                       "fields": {"base_url": request.base_url, "remote_address": request.remote_addr,
#                                  'response_time': (datetime.now() - request.start_time).microseconds}}]
#             dbcon.write_points(point)
#     except Exception as e:
#         pass
#         logger.debug('Write api statistics data to influxdb failed, error?' + e.message)
#     return resp
__init__.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 69 收藏 0 点赞 0 评论 0
def ratelimit(limit, per, send_x_headers=True,
              scope_func=lambda: request.remote_addr,
              key_func=lambda: request.endpoint,
              path=lambda: request.path):
    """
    Decorator for limiting the access to a route.

    Returns the function if within the limit, otherwise TooManyRequests error

    """
    def decorator(f):
        @wraps(f)
        def rate_limited(*args, **kwargs):
            try:
                key = 'rate-limit/%s/%s/' % (key_func(), scope_func())
                rlimit = RateLimit(key, limit, per, send_x_headers)
                g._view_rate_limit = rlimit
                #if over_limit is not None and rlimit.over_limit:
                if rlimit.over_limit:
                    raise TooManyRequests
                return f(*args, **kwargs)
            except Exception as e:
                return error.format_exception(e, target=path(),
                                              action=f.__name__)
        return update_wrapper(rate_limited, f)
    return decorator
views.py 文件源码 项目:oadoi 作者: Impactstory 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def log_request(resp):
    if request.endpoint != "get_doi_endpoint":
        return

    logging_start_time = time()

    try:
        results = json.loads(resp.get_data())["results"][0]
    except (ValueError, RuntimeError, KeyError):
        # don't bother logging if no results
        return

    oa_color = results["oa_color"]
    if not oa_color:
        oa_color = "gray"

    body = {
        "timestamp": datetime.utcnow().isoformat(),
        "elapsed": elapsed(g.request_start_time, 2),
        "ip": get_ip(),
        "status_code": resp.status_code,
        "email": request.args.get("email", None),
        "doi": results["doi"],
        "year": results.get("year", None),
        "oa_color": oa_color
    }

    h = {
        "content-type": "text/json",
        "X-Forwarded-For": get_ip()
    }

    url = "http://logs-01.loggly.com/inputs/6470410b-1d7f-4cb2-a625-72d8fa867d61/tag/{}/".format(
        oa_color)
    requests.post(url, headers=h, data=json.dumps(body))
    # logger.info(u"log_request took {} seconds".format(elapsed(logging_start_time, 2)))
views.py 文件源码 项目:oadoi 作者: Impactstory 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def print_ip():
    user_agent = request.headers.get('User-Agent')
    logger.info(u"calling from IP {ip}. User-Agent is '{user_agent}'.".format(
        ip=get_ip(),
        user_agent=user_agent
    ))


# this is the old way of expressing this endpoint.
# the new way is POST api.oadoi.org/
# you can give it an object that lists DOIs
# you can also give it an object that lists biblios.
# this is undocumented and is just for impactstory use now.
views.py 文件源码 项目:oadoi 作者: Impactstory 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_doi_endpoint(doi):
    # the GET api endpoint (returns json data)
    my_pub = get_pub_from_doi(doi)
    return jsonify({"results": [my_pub.to_dict()]})
views.py 文件源码 项目:oadoi 作者: Impactstory 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_doi_endpoint_v2(doi):
    # the GET api endpoint (returns json data)
    my_pub = get_pub_from_doi(doi)
    return jsonify(my_pub.to_dict_v2())
views.py 文件源码 项目:circleci-demo-python-flask 作者: CircleCI-Public 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def before_request():
    if current_user.is_authenticated:
        current_user.ping()
        if not current_user.confirmed \
                and request.endpoint[:5] != 'auth.' \
                and request.endpoint != 'static':
            return redirect(url_for('auth.unconfirmed'))
jwt.py 文件源码 项目:drift 作者: dgnorth 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def check_jwt_authorization():
    current_identity = getattr(_request_ctx_stack.top,
                               'current_identity', None)
    if current_identity:
        return current_identity

    skip_check = False

    if current_app.config.get("disable_jwt", False):
        skip_check = True

    if request.endpoint in current_app.view_functions:
        fn = current_app.view_functions[request.endpoint]

        # Check Flask-RESTful endpoints for openness
        if hasattr(fn, "view_class"):
            exempt = getattr(fn.view_class, "no_jwt_check", [])
            if request.method in exempt:
                skip_check = True
        elif fn in _open_endpoints:
            skip_check = True

    # the static folder is open to all without authentication
    if request.endpoint == "static" or request.url.endswith("favicon.ico"):
        skip_check = True

    # In case the endpoint requires no authorization, and the request does not
    # carry any authorization info as well, we will not try to verify any JWT's
    if skip_check and 'Authorization' not in request.headers:
        return

    token, auth_type = get_auth_token_and_type()
    current_identity = verify_token(token, auth_type)
    if auth_type == "JWT":
        # Cache this token
        cache_token(current_identity)

    # Authorization token has now been converted to a verified payload
    _request_ctx_stack.top.current_identity = current_identity
    return current_identity
jwt.py 文件源码 项目:drift 作者: dgnorth 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def requires_roles(_roles):
    """
        endpoint decorator to lock down an endpoint
        on a set of roles (comma delimitered)
    """
    def wrapper(fn):
        @wraps(fn)
        def decorator(*args, **kwargs):
            if not _roles:
                return fn(*args, **kwargs)
            current_user = query_current_user()
            if not current_user:
                abort_unauthorized("You do not have access to this resource."
                                   " It requires role '%s'" % _roles)

            required_roles = set(_roles.split(","))
            user_roles = set(current_user.get("roles", []))
            if not required_roles.intersection(user_roles):
                log.warning("User does not have the needed roles for this "
                            "call. User roles = '%s', Required roles = "
                            "'%s'. current_user = '%s'",
                            current_user.get("roles", ""),
                            _roles, repr(current_user))
                abort_unauthorized("You do not have access to this resource. "
                                   "It requires role '%s'" % _roles)
            return fn(*args, **kwargs)
        return decorator
    return wrapper


# List of open endpoints, i.e. not requiring a valid JWT.
jwt.py 文件源码 项目:drift 作者: dgnorth 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def jwt_not_required(fn):
    log.debug("Registering open endpoint: %s", fn.__name__)
    _open_endpoints.add(fn)
    return fn
__init__.py 文件源码 项目:do-portal 作者: certeu 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def auth_audit_log(response):
    """On deployment remove the ``crossdomain`` decorator"""
    try:
        jdata = json.loads(request.data.decode())
        if 'password' in jdata:
            jdata['password'] = '*********'
        jdata_str = json.dumps(jdata)
    except ValueError:
        jdata_str = ''
    kwargs = {
        'module': auth.name,
        'user': current_user.name,
        'email': current_user.email,
        'action': _HTTP_METHOD_TO_AUDIT_MAP[request.method.lower()],
        'data': addslashes(jdata_str),
        'url': request.url,
        'endpoint': request.endpoint,
        'ip': request.remote_addr,
        'status': response.status,
        'timestamp': datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
    }
    entry = []
    for k, v in kwargs.items():
        entry.append('{0!s}="{1!s}"'.format(k, v))
    entry = ' '.join(entry)
    current_app.audit_log.info('{0!s}'.format(entry))
    return response
__init__.py 文件源码 项目:do-portal 作者: certeu 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def api_audit_log(response):
    """Saves information about the request in the ``audit_log``

    :param response: Server :class:`~flask.Response`
    :return: :class:`~flask.Response`
    """
    kwargs = {
        'module': api.name,
        'user': current_user.name,
        'email': current_user.email,
        'action': _HTTP_METHOD_TO_AUDIT_MAP[request.method.lower()],
        'data': addslashes(request.data.decode()),
        'url': request.url,
        'endpoint': request.endpoint,
        'ip': request.remote_addr,
        'status': response.status,
        'timestamp': datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
    }
    if not request.view_args and request.method.lower() == 'put':
        kwargs['action'] = _HTTP_METHOD_TO_AUDIT_MAP['post']
    entry = []
    for k, v in kwargs.items():
        entry.append('{0!s}="{1!s}"'.format(k, v))
    entry = ' '.join(entry)
    current_app.audit_log.info('{0!s}'.format(entry))
    return response
models.py 文件源码 项目:do-portal 作者: certeu 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def all(self):
        try:
            return getattr(get_mailman_client(), pluralize(self.endpoint))
        except AttributeError:
            raise MailmanApiError
        except MailmanConnectionError as e:
            raise MailmanApiError(e)
models.py 文件源码 项目:do-portal 作者: certeu 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get(self, **kwargs):
        try:
            method = getattr(get_mailman_client(), 'get_' + self.endpoint)
            return method(**kwargs)
        except AttributeError as e:
            raise MailmanApiError(e)
        except HTTPError as e:
            raise
        except MailmanConnectionError as e:
            raise MailmanApiError(e)
models.py 文件源码 项目:do-portal 作者: certeu 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, endpoint, **kwargs):
        self.endpoint = endpoint
__init__.py 文件源码 项目:do-portal 作者: certeu 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def cp_audit_log(response):
    """Saves information about the request in the ``audit_log``

    :param response: Server :class:`~flask.Response`
    :return: :class:`~flask.Response`
    """
    try:
        jdata = json.loads(request.data.decode())
        if 'password' in jdata:
            jdata['password'] = '*********'
        jdata_str = json.dumps(jdata)
    except ValueError:
        jdata_str = ''

    kwargs = {
        'module': cp.name,
        'user': g.user.name,
        'email': g.user.email,
        'action': _HTTP_METHOD_TO_AUDIT_MAP[request.method.lower()],
        'data': addslashes(jdata_str),
        'url': request.url,
        'endpoint': request.endpoint,
        'ip': request.remote_addr,
        'status': response.status,
        'timestamp': datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
    }
    if not request.view_args and request.method.lower() == 'put':
        kwargs['action'] = _HTTP_METHOD_TO_AUDIT_MAP['post']
    entry = []
    for k, v in kwargs.items():
        entry.append('{0!s}="{1!s}"'.format(k, v))
    entry = ' '.join(entry)
    current_app.audit_log.info('{0!s}'.format(entry))
    return response


问题


面经


文章

微信
公众号

扫码关注公众号