def get_data(self):
"""
Get request data based on request.method and request.mimetype
Returns:
A regular dict which can be modified(scheme will modify data
on validating)
"""
if request.method in ['GET', 'DELETE']:
return request.args.to_dict()
else:
if request.mimetype == 'application/json':
data = request.get_json()
if not isinstance(data, collections.Mapping):
self.handle_error('JSON content must be object')
return data
else:
return request.form.to_dict()
python类mimetype()的实例源码
def from_request(cls, request, app, name):
if not Fixture.is_supported(request.mimetype):
raise TypeError
fixture = cls(request.data, name, app.name, request.path,
request.method, is_response=False)
return fixture
def from_response(cls, response, app, name):
if not Fixture.is_supported(response.mimetype):
raise TypeError
fixture = cls(response.get_data(), name, app.name, request.path,
request.method)
return fixture
def parse_body(self):
# We use mimetype here since we don't need the other
# information provided by content_type
content_type = request.mimetype
if content_type == 'application/graphql':
return {'query': request.data.decode('utf8')}
elif content_type == 'application/json':
return load_json_body(request.data.decode('utf8'))
elif content_type in ('application/x-www-form-urlencoded', 'multipart/form-data'):
return request.form
return {}
def parse_command_args(response: 'Response') -> typing.Tuple[str, str]:
"""
:param response:
The response object to modify with status or error data
:return:
A tuple where the first element if the name of the command
to execute, and the second is a string representing the arguments
to apply to that command.
"""
cmd = None
parts = None
name = None
args = None
request_args = arguments.from_request()
try:
cmd = request_args.get('command', '')
parts = [x.strip() for x in cmd.split(' ', 1)]
name = parts[0].lower()
args = request_args.get('args', '')
if not isinstance(args, str):
args = ' '.join(args)
args += ' {}'.format(parts[1] if len(parts) > 1 else '').strip()
except Exception as err:
response.fail(
code='INVALID_COMMAND',
message='Unable to parse command',
cmd=cmd if cmd else '',
parts=parts,
name=name,
args=args,
error=err,
mime_type='{}'.format(request.mimetype),
request_data='{}'.format(request.data),
request_args=request_args
)
return name, args
def badger():
if request.mimetype != 'application/json':
log.debug('Received %s instead of application/json', request.mimetype)
raise wz_exceptions.BadRequest()
# Parse the request
args = request.json
action = args.get('action', '')
user_email = args.get('user_email', '')
role = args.get('role', '')
current_user_id = authentication.current_user_id()
log.info('Service account %s %ss role %r to/from user %s',
current_user_id, action, role, user_email)
users_coll = current_app.data.driver.db['users']
# Check that the user is allowed to grant this role.
srv_user = users_coll.find_one(current_user_id,
projection={'service.badger': 1})
if srv_user is None:
log.error('badger(%s, %s, %s): current user %s not found -- how did they log in?',
action, user_email, role, current_user_id)
return 'User not found', 403
allowed_roles = set(srv_user.get('service', {}).get('badger', []))
if role not in allowed_roles:
log.warning('badger(%s, %s, %s): service account not authorized to %s role %s',
action, user_email, role, action, role)
return 'Role not allowed', 403
return do_badger(action, role=role, user_email=user_email)
def create_project(overrides=None):
"""Creates a new project."""
if request.mimetype == 'application/json':
project_name = request.json['name']
else:
project_name = request.form['project_name']
user_id = current_user.user_id
project = utils.create_new_project(project_name, user_id, overrides)
# Return the project in the response.
loc = url_for('projects|item_lookup', _id=project['_id'])
return jsonify(project, status=201, headers={'Location': loc})