def paginate(query, schema):
page = request.args.get('page', DEFAULT_PAGE_NUMBER)
per_page = request.args.get('page_size', DEFAULT_PAGE_SIZE)
page_obj = query.paginate(page=page, per_page=per_page)
next = url_for(
request.endpoint,
page=page_obj.next_num if page_obj.has_next else page_obj.page,
per_page=per_page,
**request.view_args
)
prev = url_for(
request.endpoint,
page=page_obj.prev_num if page_obj.has_prev else page_obj.page,
per_page=per_page,
**request.view_args
)
return {
'total': page_obj.total,
'pages': page_obj.pages,
'next': next,
'prev': prev,
'results': schema.dump(page_obj.items).data
}
python类view_args()的实例源码
def __init__(self, options, func, request_context):
self.options = options
self.operation = request.endpoint
self.func = func.__name__
self.method = request.method
self.args = request.args
self.view_args = request.view_args
self.request_context = request_context
self.timing = dict()
self.error = None
self.stack_trace = None
self.request_body = None
self.response_body = None
self.response_headers = None
self.status_code = None
self.success = None
def log_exception(self, exc_info):
self.logger.error("""
Path: %s
HTTP Method: %s
Client IP Address: %s
User Agent: %s
User Platform: %s
User Browser: %s
User Browser Version: %s
GET args: %s
view args: %s
URL: %s
""" % (
request.path,
request.method,
request.remote_addr,
request.user_agent.string,
request.user_agent.platform,
request.user_agent.browser,
request.user_agent.version,
dict(request.args),
request.view_args,
request.url
), exc_info=exc_info)
def bucket_required(f):
"""
Decorator to ensure that a valid bucket id is sent in the url path parameters
:param f:
:return:
"""
@wraps(f)
def decorated_function(*args, **kwargs):
bucket_id_ = request.view_args['bucket_id']
try:
int(bucket_id_)
except ValueError:
return response('failed', 'Provide a valid Bucket Id', 401)
return f(*args, **kwargs)
return decorated_function
def _auto_update(self, mapper=None):
cls = self.__class__
if mapper is None:
mapper = class_mapper(cls)
for col in mapper.columns:
# this order is the listed order except that hybrid properties go first
api_info = cls._get_api_info(col.name)
set_by = api_info['set_by']
if set_by == 'json':
if col.name in g.fields:
val = g.fields.get(col.name)
self._set_field_value(col.name, val)
elif set_by == 'url':
if col.name in request.view_args:
val = request.view_args.get(col.name)
self._set_field_value(col.name, val)
elif set_by == 'server':
# important not to let it try to set it from json
# requires a @setter decorated function
self._set_field_value(name=col.name, try_auto=False)
def get_obj(self):
args = request.args.to_dict()
args.update({self.pk_field: request.view_args[self.pk_url_kwarg]})
self.builder = PeeweeQueryBuilder(self.model, args)
try:
obj = self.builder.build().get()
except self.model.DoesNotExist:
obj = None
return obj
def to_dict(self):
dct = dict(
operation=self.operation,
func=self.func,
method=self.method,
**self.timing
)
if self.options.include_path and self.view_args:
dct.update({
key: value
for key, value in self.view_args.items()
})
if self.options.include_query_string and self.args:
dct.update({
key: values[0]
for key, values in self.args.lists()
if len(values) == 1 and is_uuid(values[0])
})
if self.request_context is not None:
dct.update(self.request_context())
if self.success is True:
dct.update(
success=self.success,
status_code=self.status_code,
)
if self.success is False:
dct.update(
success=self.success,
message=extract_error_message(self.error)[:2048],
context=extract_context(self.error),
stack_trace=self.stack_trace,
status_code=self.status_code,
)
self.post_process_request_body(dct)
self.post_process_response_body(dct)
self.post_process_response_headers(dct)
return dct
def api_audit_log(response):
"""Saves information about the request in the ``audit_log``
:param response: Server :class:`~flask.Response`
:return: :class:`~flask.Response`
"""
kwargs = {
'module': api.name,
'user': current_user.name,
'email': current_user.email,
'action': _HTTP_METHOD_TO_AUDIT_MAP[request.method.lower()],
'data': addslashes(request.data.decode()),
'url': request.url,
'endpoint': request.endpoint,
'ip': request.remote_addr,
'status': response.status,
'timestamp': datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
}
if not request.view_args and request.method.lower() == 'put':
kwargs['action'] = _HTTP_METHOD_TO_AUDIT_MAP['post']
entry = []
for k, v in kwargs.items():
entry.append('{0!s}="{1!s}"'.format(k, v))
entry = ' '.join(entry)
current_app.audit_log.info('{0!s}'.format(entry))
return response
def cp_audit_log(response):
"""Saves information about the request in the ``audit_log``
:param response: Server :class:`~flask.Response`
:return: :class:`~flask.Response`
"""
try:
jdata = json.loads(request.data.decode())
if 'password' in jdata:
jdata['password'] = '*********'
jdata_str = json.dumps(jdata)
except ValueError:
jdata_str = ''
kwargs = {
'module': cp.name,
'user': g.user.name,
'email': g.user.email,
'action': _HTTP_METHOD_TO_AUDIT_MAP[request.method.lower()],
'data': addslashes(jdata_str),
'url': request.url,
'endpoint': request.endpoint,
'ip': request.remote_addr,
'status': response.status,
'timestamp': datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
}
if not request.view_args and request.method.lower() == 'put':
kwargs['action'] = _HTTP_METHOD_TO_AUDIT_MAP['post']
entry = []
for k, v in kwargs.items():
entry.append('{0!s}="{1!s}"'.format(k, v))
entry = ' '.join(entry)
current_app.audit_log.info('{0!s}'.format(entry))
return response
def __init__(self, request, has_json):
self.request = request
self.query = request.args
self.path = request.view_args
self.headers = request.headers
if has_json:
self._json = self.request.get_json(force=True)
def user_cache_key():
"""?path paramaters???userid
"""
return 'cache::user::{}'.format(request.view_args['id'])
def validate_request(typ):
validate_method(request.method, typ['methods'], P_REQUEST)
validate_route(request.view_args, typ['route'], P_REQUEST)
validate_json(request.get_json(), typ['schema'], P_REQUEST, key='')
def build_url(self, **kwargs):
arg = request.args.copy()
view_args = request.view_args
arg.update(view_args)
for attr in kwargs.keys():
if attr in arg:
arg.pop(attr)
arg.update(kwargs.items())
rule = request.url_rule
result = rule.build(arg)
return result[1]
def before_request():
metadata = get_metadata(current_user)
if metadata:
logger.bind(tx_id=metadata['tx_id'])
values = request.view_args
logger.bind(eq_id=values['eq_id'], form_type=values['form_type'],
ce_id=values['collection_id'])
logger.info('questionnaire request', method=request.method, url_path=request.full_path)
if metadata:
g.schema_json = load_schema_from_metadata(metadata)
_check_same_survey(values['eq_id'], values['form_type'], values['collection_id'])
def _check_etag_and_lastmodified_for_i18n():
locale = request.view_args["locale"]
domain = request.view_args["domain"]
etag_ok = util.flask.check_etag(_compute_etag_for_i18n(request.view_args["locale"], request.view_args["domain"]))
lastmodified = _compute_date_for_i18n(locale, domain)
lastmodified_ok = lastmodified is None or util.flask.check_lastmodified(lastmodified)
return etag_ok and lastmodified_ok
def _dict(self):
mydict = {}
# manage timing
mydict['timing'] = {}
mydict['timing']['delta'] = self.timing
mydict['timing']['start'] = self.request._stats_start_event
mydict['timing']['asctime'] = asctime(gmtime(self.request._stats_start_event))
# manage flask
mydict['flask'] = {}
mydict['flask']['secret_key'] = current_app.config['SECRET_KEY']
mydict['flask']['server_name'] = current_app.config['SERVER_NAME']
mydict['flask']['session_cookie_name'] = current_app.config['SESSION_COOKIE_NAME']
mydict['flask']['session_cookie_domain'] = current_app.config['SESSION_COOKIE_DOMAIN']
mydict['flask']['session_cookie_path'] = current_app.config['SESSION_COOKIE_PATH']
mydict['flask']['session_cookie_httponly'] = current_app.config['SESSION_COOKIE_HTTPONLY']
mydict['flask']['session_cookie_secure'] = current_app.config['SESSION_COOKIE_SECURE']
mydict['flask']['session_refresh_each_request'] = current_app.config['SESSION_REFRESH_EACH_REQUEST']
# manage request
mydict['request'] = {}
mydict['request']['url'] = request.url
mydict['request']['args'] = {arg: request.args.get(arg) for arg in request.args}
mydict['request']['view_args'] = request.view_args
mydict['request']['path'] = request.path
mydict['request']['method'] = request.method
mydict['request']['remote_addr'] = request.remote_addr
try:
mydict['request']['rule'] = request.url_rule.rule
except:
mydict['request']['rule'] = ''
#manage response
mydict['response'] = {}
mydict['response']['status_code'] = self.response.status_code
mydict['response']['headers'] = { i:j for i,j in self.response.headers}
return mydict
def make_proxy_method(cls, name):
"""
Creates a proxy function that can be used by Flasks routing. The
proxy instantiates the FlaskView subclass and calls the appropriate
method.
:param name: the name of the method to create a proxy for
"""
i = cls()
view = getattr(i, name)
if cls.__decorators__:
for decorator in cls.__decorators__:
view = decorator(view)
@functools.wraps(view)
def proxy(**forgettable_view_args):
# Always use the global request object's view_args, because they
# can be modified by intervening function before an endpoint or
# wrapper gets called. This matches Flask's behavior.
del forgettable_view_args
response = view(**request.view_args)
if not isinstance(response, Response):
response = make_response(response)
return response
return proxy
def create_issue(content, author, location='Discord', repo='PennyDreadfulMTG/Penny-Dreadful-Tools'):
if content is None or content == '':
return None
body = ''
if '\n' in content:
title, body = content.split('\n', 1)
body += '\n\n'
else:
title = content
body += 'Reported on {location} by {author}'.format(location=location, author=author)
if request:
body += textwrap.dedent("""
--------------------------------------------------------------------------------
Request Method: {method}
Path: {full_path}
Cookies: {cookies}
Endpoint: {endpoint}
View Args: {view_args}
Person: {id}
User-Agent: {user_agent}
Referrer: {referrer}
""".format(method=request.method, full_path=request.full_path, cookies=request.cookies, endpoint=request.endpoint, view_args=request.view_args, id=session.get('id', 'logged_out'), user_agent=request.headers.get('User-Agent'), referrer=request.referrer))
print(title + '\n' + body)
# Only check for github details at the last second to get log output even if github not configured.
if not configuration.get('github_user') or not configuration.get('github_password'):
return None
g = Github(configuration.get('github_user'), configuration.get('github_password'))
repo = g.get_repo(repo)
issue = repo.create_issue(title=title, body=body)
return issue
def get_object(self):
query = self.get_query()
query = self.filter_query(query)
model_class = self.get_model_class()
id_type = inspect(model_class).columns['id'].type
obj_id = request.view_args['id']
if isinstance(id_type, Integer):
try:
obj_id = int(obj_id)
except ValueError:
raise NotFound()
elif isinstance(id_type, postgresql.UUID):
try:
# Check the ID is a valid UUID so don't try
# to query with an invalid UUID.
uuid.UUID(obj_id)
except ValueError:
raise NotFound()
query = query.filter(model_class.id == obj_id)
try:
obj = query.one()
except NoResultFound:
raise NotFound()
self.check_object_permissions(obj)
return obj
def get(self, *args, **kwargs):
"""Retrieve a collection of objects"""
self.before_get(args, kwargs)
qs = QSManager(request.args, self.schema)
objects_count, objects = self._data_layer.get_collection(qs, kwargs)
schema_kwargs = getattr(self, 'get_schema_kwargs', dict())
schema_kwargs.update({'many': True})
schema = compute_schema(self.schema,
schema_kwargs,
qs,
qs.include)
result = schema.dump(objects).data
view_kwargs = request.view_args if getattr(self, 'view_kwargs', None) is True else dict()
add_pagination_links(result,
objects_count,
qs,
url_for(self.view, **view_kwargs))
result.update({'meta': {'count': objects_count}})
self.after_get(result)
return result