def handle_no_auth(request):
return response.json(dict(message='unauthorized'), status=401)
python类json()的实例源码
def api_profile(request, user):
return response.json(dict(id=user.id, name=user.name))
def _validate_field_types(self, request):
shortcuts = self.model.shortcuts
response_messages = self.config.response_messages
fields = shortcuts.editable_fields
request_data = request.json
for key, value in request_data.items():
expected_type = fields.get(key).db_field
if expected_type in ['int', 'bool']:
try:
int(value)
except (ValueError, TypeError):
if expected_type == 'int':
message = response_messages.ErrorTypeInteger.format(value)
else:
message = response_messages.ErrorTypeBoolean.format(value)
return self.response_json(status_code=400, message=message)
return True
def _validate_field_size(self, request):
shortcuts = self.model.shortcuts
response_messages = self.config.response_messages
request_data = request.json
for key, value in request_data.items():
field_type = shortcuts.editable_fields.get(key).db_field
if field_type == 'int':
min_size = -2147483647
max_size = 2147483647
elif field_type == 'bigint':
min_size = -9223372036854775808
max_size = 9223372036854775807
else:
continue
if not min_size <= value <= max_size:
return self.response_json(status_code=400,
message=response_messages.ErrorFieldOutOfRange.format(key, min_size, max_size))
return True
def post_login(request):
token = request.json['auth_token']
url = '{}/v1/login/verify/{}'.format(ID_SERVICE_LOGIN_URL, token)
resp = await app.http.get(url)
if resp.status != 200:
raise SanicException("Login Failed", status_code=401)
user = await resp.json()
toshi_id = user['toshi_id']
session_id = generate_session_id()
async with app.pool.acquire() as con:
admin = await con.fetchrow("SELECT * FROM admins WHERE toshi_id = $1", toshi_id)
if admin:
await con.execute("INSERT INTO sessions (session_id, toshi_id) VALUES ($1, $2)",
session_id, toshi_id)
if admin:
response = json_response(user)
response.cookies['session'] = session_id
#response.cookies['session']['secure'] = True
return response
else:
toshi_log.info("Invalid login from: {}".format(toshi_id))
raise SanicException("Login Failed", status_code=401)
def test_storage():
app = Sanic('test_text')
@app.middleware('request')
def store(request):
request['user'] = 'sanic'
request['sidekick'] = 'tails'
del request['sidekick']
@app.route('/')
def handler(request):
return json({ 'user': request.get('user'), 'sidekick': request.get('sidekick') })
request, response = sanic_endpoint_test(app)
response_json = loads(response.text)
assert response_json['user'] == 'sanic'
assert response_json.get('sidekick') is None
def test_list_default():
app = Sanic('test_get')
app.blueprint(openapi_blueprint)
@app.put('/test')
@doc.consumes(doc.List(int, description="All the numbers"), location="body")
def test(request):
return json({"test": True})
request, response = app.test_client.get('/openapi/spec.json')
response_schema = json_loads(response.body.decode())
parameter = response_schema['paths']['/test']['put']['parameters'][0]
assert response.status == 200
assert parameter['type'] == 'array'
assert parameter['items']['type'] == 'integer'
def __call__(self, view_func, *_args, **_kwargs):
async def wrapped_view(*args, **kwargs):
request = None
if 'request' in kwargs:
request = kwags['request']
for arg in args:
if type(arg) == Request:
request = arg
break
if not request:
raise ServerError('No request found!')
if request['session'].get('user', None):
return await view_func(*args, **kwargs)
if self.redirect:
return login_redirect(request)
return json({'message': 'access denied'}, status=403)
return wrapped_view
def delete(self, request, table_name=None):
"""Delete endpoint.
:param request: Sanic Request.
:param table_name: Name of the table to access.
"""
if not check_csrf(request):
return json({'message': 'access denied'}, status=403)
waf = get_jawaf()
table = registry.get(table_name)
target_id = request.json.get('id', None)
if not target_id:
return json({'message': 'no id'}, status=400)
if not table:
return json({'message': 'access denied'}, status=403)
async with Connection(table['database']) as con:
stmt = table['table'].delete().where(table['table'].c.id==target_id)
await con.execute(stmt)
await add_audit_action('delete', 'admin', table_name, request['session']['user'])
return json({'message': 'success'}, status=200)
def get(self, request, table_name=None):
"""Get endpoint. Retrieve one object by id (url param `id=`)
:param request: Sanic Request.
:param table_name: Name of the table to access.
"""
waf = get_jawaf()
table = registry.get(table_name)
try:
target_id = int(request.raw_args.get('id', None))
except:
return json({'message': 'no id'}, status=400)
if not table:
return json({'message': 'access denied'}, status=403)
async with Connection(table['database']) as con:
query = sa.select('*').select_from(table['table']).where(table['table'].c.id==target_id)
result = await con.fetchrow(query)
if not result:
return json({'message': 'not found', 'data': None}, status=404)
return json({'message': 'success', 'data': result}, status=200)
def patch(self, request, table_name=None):
"""Patch endpoint. Partially edit a row.
:param request: Sanic Request.
:param table_name: Name of the table to access.
"""
if not check_csrf(request):
return json({'message': 'access denied'}, status=403)
waf = get_jawaf()
table = registry.get(table_name)
target_id = request.json.get('id', None)
if not target_id:
return json({'message': 'no id'}, status=400)
if not table:
return json({'message': 'access denied'}, status=403)
request.json.pop('id')
async with Connection(table['database']) as con:
stmt = table['table'].update().where(table['table'].c.id==target_id).values(**request.json)
await con.execute(stmt)
await add_audit_action('put', 'admin', table_name, request['session']['user'])
return json({'message': 'success'}, status=200)
def post(self, request, table_name=None):
"""Post endpoint. Create a new row.
:param request: Sanic Request.
:param table_name: Name of the table to access.
"""
if not check_csrf(request):
return json({'message': 'access denied'}, status=403)
waf = get_jawaf()
table = registry.get(table_name)
if not table:
return json({'message': 'access denied'}, status=403)
async with Connection(table['database']) as con:
stmt = table['table'].insert().values(**request.json)
await con.execute(stmt)
await add_audit_action('post', 'admin', table_name, request['session']['user'])
return json({'message': 'success'}, status=201)
def scoped(scopes, require_all=True, require_all_actions=True):
def decorator(f):
@wraps(f)
async def decorated_function(request, *args, **kwargs):
# Retrieve the scopes from the payload
user_scopes = request.app.auth.retrieve_scopes(request)
if user_scopes is None:
# If there are no defined scopes in the payload, deny access
is_authorized = False
else:
is_authorized = validate_scopes(request, scopes, user_scopes, require_all, require_all_actions)
if is_authorized:
# the user is authorized.
# run the handler method and return the response
response = await f(request, *args, **kwargs)
return response
else:
# the user is not authorized.
return json({
'status': 'not_authorized',
}, 403)
return response
return decorated_function
return decorator
def get_token_reponse(request, access_token, output, refresh_token=None):
response = json(output)
if request.app.config.SANIC_JWT_COOKIE_SET:
key = request.app.config.SANIC_JWT_COOKIE_TOKEN_NAME
response.cookies[key] = str(access_token, 'utf-8')
response.cookies[key]['domain'] = request.app.config.SANIC_JWT_COOKIE_DOMAIN
response.cookies[key]['httponly'] = request.app.config.SANIC_JWT_COOKIE_HTTPONLY
if refresh_token and request.app.config.SANIC_JWT_REFRESH_TOKEN_ENABLED:
key = request.app.config.SANIC_JWT_COOKIE_REFRESH_TOKEN_NAME
response.cookies[key] = refresh_token
response.cookies[key]['domain'] = request.app.config.SANIC_JWT_COOKIE_DOMAIN
response.cookies[key]['httponly'] = request.app.config.SANIC_JWT_COOKIE_HTTPONLY
return response
def get_messages(id):
result = {}
try:
messages = mail.Message.filter(address=id)
if (len(messages) == 0):
raise NotFound('could not find messages for email %s' % ( id))
result['count'] = len(messages)
msg_list = []
for msg in messages:
msg_list.append(msg.toJson())
result['messages'] = msg_list
except (mail.Message.DoesNotExist, KeyError):
raise NotFound('could not find messages for email %s' % ( id))
except (mail.Email.DoesNotExist, KeyError):
raise NotFound('could not find email with address %s' % (id))
return json(result)
def get(request):
'''
Generate x number of emails
'''
count = int(get_post_param(request, 'count'))
try:
if count <= 0:
raise InvalidUsage('count must be greater than 0')
except KeyError:
raise InvalidUsage('count must be present')
live_emails = mail.Email.all()
logger.debug("Generating %s emails.", count)
email_gen = utils.generate_emails(count)
emails = []
addresses = []
for n in range(count):
email = next(email_gen)
emails.append(email)
addresses.append(email.address)
db_service.batch_save(emails)
return json({'accounts': addresses, "total_active": len(live_emails) + count}, status = 201)
saniconeconnect.py 文件源码
项目:python-tarantool-benchmark-and-bootstrap
作者: valentinmk
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def stickers_api_packs_stickers(self, request, packs, stickers):
#packs = parse.unquote(packs)
#stickers = parse.unquote(stickers)
resp = {"packs": None,
"stickers": None}
resp["packs"] = packs
resp["stickers"] = stickers
resp.update({"comment": "Get sticker info"})
resp.update({"name": resp["stickers"]})
print(resp["stickers"])
sticker = await self.db.get_stickers_by_name(resp["stickers"])
print(sticker)
resp.update({"url": sticker[0][2]})
resp.update({"id": sticker[0][0]})
resp.update({"rating": sticker[0][1]})
resp.update({"pack_url": sticker[0][3]})
return json(resp)
def test_form(test_cli):
resp = yield from test_cli.post('/form/', data={'a': 2})
assert resp.status == 200
resp_json = yield from resp.json()
assert resp_json == {'a': 2}
body, mct = helpers.encode_multipart([('a', '2')])
resp = yield from test_cli.post('/form/', data=body,
headers={'Content-Type': mct})
assert resp.status == 200
resp_json = yield from resp.json()
assert resp_json == {'a': 2}
resp = yield from test_cli.post('/cbv/', data={'a': 2})
assert resp.status == 200
resp_json = yield from resp.json()
assert resp_json == {'a': 2}
def test_form(test_cli):
resp = yield from test_cli.post('/form/', data={'a': 2})
assert resp.status == 200
resp_json = yield from resp.json()
assert resp_json == {'a': 2}
body, mct = helpers.encode_multipart([('a', '2')])
resp = yield from test_cli.post('/form/', data=body,
headers={'Content-Type': mct})
assert resp.status == 200
resp_json = yield from resp.json()
assert resp_json == {'a': 2}
resp = yield from test_cli.post('/cbv/', data={'a': 2})
assert resp.status == 200
resp_json = yield from resp.json()
assert resp_json == {'a': 2}
def test_form(test_cli):
resp = yield from test_cli.post('/form/', data={'a': 2})
assert resp.status == 200
resp_json = yield from resp.json()
assert resp_json == {'a': 2}
body, mct = helpers.encode_multipart([('a', '2')])
resp = yield from test_cli.post('/form/', data=body,
headers={'Content-Type': mct})
assert resp.status == 200
resp_json = yield from resp.json()
assert resp_json == {'a': 2}
resp = yield from test_cli.post('/cbv/', data={'a': 2})
assert resp.status == 200
resp_json = yield from resp.json()
assert resp_json == {'a': 2}
def owllook_logout(request):
"""
????
:param request:
:return:
: 0 ????
: 1 ????
"""
user = request['session'].get('user', None)
if user:
response = json({'status': 1})
del response.cookies['user']
del response.cookies['owl_sid']
return response
else:
return json({'status': 0})
def owllook_delete_bookmark(request):
"""
????
:param request:
:return:
: -1 ??session?? ??????
: 0 ??????
: 1 ??????
"""
user = request['session'].get('user', None)
data = parse_qs(str(request.body, encoding='utf-8'))
bookmarkurl = data.get('bookmarkurl', '')
if user and bookmarkurl:
bookmark = unquote(bookmarkurl[0])
try:
motor_db = motor_base.get_db()
await motor_db.user_message.update_one({'user': user},
{'$pull': {'bookmarks': {"bookmark": bookmark}}})
LOGGER.info('??????')
return json({'status': 1})
except Exception as e:
LOGGER.exception(e)
return json({'status': 0})
else:
return json({'status': -1})
def change_email(request):
"""
??????
:param request:
:return:
: -1 ??session?? ??????
: 0 ??????
: 1 ??????
"""
user = request['session'].get('user', None)
data = parse_qs(str(request.body, encoding='utf-8'))
if user:
try:
email = data.get('email', None)[0]
motor_db = motor_base.get_db()
await motor_db.user.update_one({'user': user},
{'$set': {'email': email}})
LOGGER.info('??????')
return json({'status': 1})
except Exception as e:
LOGGER.exception(e)
return json({'status': 0})
else:
return json({'status': -1})
def authorized():
def decorator(f):
@wraps(f)
async def decorated_function(request, *args, **kwargs):
# run some method that checks the request
# for the client's authorization status
is_authorized = check_request_for_authorization_status(request)
if is_authorized:
# the user is authorized.
# run the handler method and return the response
response = await f(request, *args, **kwargs)
return response
else:
# the user is not authorized.
return json({'status': 'not_authorized'}, 403)
return decorated_function
return decorator
def test_utf8_post_json():
app = Sanic('test_utf8_post_json')
@app.route('/')
async def handler(request):
return text('OK')
payload = {'test': '?'}
headers = {'content-type': 'application/json'}
request, response = app.test_client.get(
'/',
data=json_dumps(payload), headers=headers)
assert request.json.get('test') == '?'
assert response.text == 'OK'
def test_match_info():
app = Sanic('test_match_info')
@app.route('/api/v1/user/<user_id>/')
async def handler(request, user_id):
return json(request.match_info)
request, response = app.test_client.get('/api/v1/user/sanic_user/')
assert request.match_info == {"user_id": "sanic_user"}
assert json_loads(response.text) == {"user_id": "sanic_user"}
# ------------------------------------------------------------ #
# POST
# ------------------------------------------------------------ #
def test_method_not_allowed():
app = Sanic('method_not_allowed')
@app.get('/')
async def test(request):
return response.json({'hello': 'world'})
request, response = app.test_client.head('/')
assert response.headers['Allow']== 'GET'
@app.post('/')
async def test(request):
return response.json({'hello': 'world'})
request, response = app.test_client.head('/')
assert response.status == 405
assert set(response.headers['Allow'].split(', ')) == set(['GET', 'POST'])
assert response.headers['Content-Length'] == '0'
def test_app_injection():
app = Sanic('test_app_injection')
expected = random.choice(range(0, 100))
@app.listener('after_server_start')
async def inject_data(app, loop):
app.injected = expected
@app.get('/')
async def handler(request):
return json({'injected': request.app.injected})
request, response = app.test_client.get('/')
response_json = loads(response.text)
assert response_json['injected'] == expected
def signin(req):
if any(map(lambda key: key not in req.json, ["login", "password"])):
logger.debug(f"Request is {req.json} but some arguments are missing.")
raise InvalidUsage("Missing argument")
user = await User.get_by_login(req.json["login"])
if user is None:
logger.debug(f"Request is {req.json} but user coundn't be found.")
raise NotFound("User not found")
if await accounts.is_frozen(user.id, req.ip):
logger.debug(f"Request is {req.json} but the account is frozen.")
raise InvalidUsage("Account frozen")
if not compare_digest(user.password, User.hashpwd(req.json["password"])):
logger.debug(f"Request is {req.json} but the password is invalid.")
unfreeze = await accounts.freeze(user.id, req.ip)
raise InvalidUsage("Invalid password. Account frozen until " + unfreeze.isoformat(sep=" ", timespec="seconds"))
await accounts.unfreeze(user.id, req.ip)
token = await accounts.register(user.id)
logger.info(f"User {user.name} connected. Token generated: {token}")
return json({"token": token, "id": user.id, "name": user.name})
def refresh(req):
if "id" not in req.json or "token" not in req.json:
logger.debug(f"Request is {req.json} but some arguments are missing.")
raise InvalidUsage("Missing argument")
user = User.get_by_id(req.json["id"])
if user is None:
logger.debug(f"Request is {req.json} but user coundn't be found.")
raise NotFound("User not found")
if await accounts.is_frozen(user.id, req.ip):
logger.debug(f"Request is {req.json} but the account is frozen.")
raise InvalidUsage("Account frozen")
if not await accounts.is_valid(req.json["id"], req.json["token"]):
logger.debug(f"Request is {req.json} but the token is invalid or expirated.")
await accounts.freeze(user.id, req.ip)
raise NotFound("Token not found or expirated")
await accounts.unfreeze(user.id, req.ip)
logger.info(f"User {user.name} refreshed it's token: {req.json['token']}")
return json({"token": req.json["token"], "id": user.id, "name": user.name})