python类json()的实例源码

note.py 文件源码 项目:sanic-auth 作者: pyx 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def handle_no_auth(request):
    return response.json(dict(message='unauthorized'), status=401)
note.py 文件源码 项目:sanic-auth 作者: pyx 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def api_profile(request, user):
    return response.json(dict(id=user.id, name=user.name))
base_resource.py 文件源码 项目:sanic_crud 作者: Typhon66 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _validate_field_types(self, request):
        shortcuts = self.model.shortcuts
        response_messages = self.config.response_messages
        fields = shortcuts.editable_fields
        request_data = request.json

        for key, value in request_data.items():
            expected_type = fields.get(key).db_field

            if expected_type in ['int', 'bool']:
                try:
                    int(value)
                except (ValueError, TypeError):
                    if expected_type == 'int':
                        message = response_messages.ErrorTypeInteger.format(value)
                    else:
                        message = response_messages.ErrorTypeBoolean.format(value)

                    return self.response_json(status_code=400, message=message)

        return True
base_resource.py 文件源码 项目:sanic_crud 作者: Typhon66 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _validate_field_size(self, request):
        shortcuts = self.model.shortcuts
        response_messages = self.config.response_messages
        request_data = request.json

        for key, value in request_data.items():
            field_type = shortcuts.editable_fields.get(key).db_field
            if field_type == 'int':
                min_size = -2147483647
                max_size = 2147483647
            elif field_type == 'bigint':
                min_size = -9223372036854775808
                max_size = 9223372036854775807
            else:
                continue

            if not min_size <= value <= max_size:
                return self.response_json(status_code=400,
                                          message=response_messages.ErrorFieldOutOfRange.format(key, min_size, max_size))

        return True
app.py 文件源码 项目:toshi-admin-service 作者: toshiapp 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def post_login(request):
    token = request.json['auth_token']
    url = '{}/v1/login/verify/{}'.format(ID_SERVICE_LOGIN_URL, token)
    resp = await app.http.get(url)
    if resp.status != 200:
        raise SanicException("Login Failed", status_code=401)

    user = await resp.json()
    toshi_id = user['toshi_id']
    session_id = generate_session_id()
    async with app.pool.acquire() as con:
        admin = await con.fetchrow("SELECT * FROM admins WHERE toshi_id = $1", toshi_id)
        if admin:
            await con.execute("INSERT INTO sessions (session_id, toshi_id) VALUES ($1, $2)",
                              session_id, toshi_id)
    if admin:
        response = json_response(user)
        response.cookies['session'] = session_id
        #response.cookies['session']['secure'] = True
        return response
    else:
        toshi_log.info("Invalid login from: {}".format(toshi_id))
        raise SanicException("Login Failed", status_code=401)
test_request_data.py 文件源码 项目:annotated-py-sanic 作者: hhstore 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_storage():
    app = Sanic('test_text')

    @app.middleware('request')
    def store(request):
        request['user'] = 'sanic'
        request['sidekick'] = 'tails'
        del request['sidekick']

    @app.route('/')
    def handler(request):
        return json({ 'user': request.get('user'), 'sidekick': request.get('sidekick') })

    request, response = sanic_endpoint_test(app)

    response_json = loads(response.text)
    assert response_json['user'] == 'sanic'
    assert response_json.get('sidekick') is None
test_types.py 文件源码 项目:sanic-openapi 作者: channelcat 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_list_default():
    app = Sanic('test_get')

    app.blueprint(openapi_blueprint)

    @app.put('/test')
    @doc.consumes(doc.List(int, description="All the numbers"), location="body")
    def test(request):
        return json({"test": True})

    request, response = app.test_client.get('/openapi/spec.json')

    response_schema = json_loads(response.body.decode())
    parameter = response_schema['paths']['/test']['put']['parameters'][0]

    assert response.status == 200
    assert parameter['type'] == 'array'
    assert parameter['items']['type'] == 'integer'
decorators.py 文件源码 项目:jawaf 作者: danpozmanter 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __call__(self, view_func, *_args, **_kwargs):
        async def wrapped_view(*args, **kwargs):
            request = None
            if 'request' in kwargs:
                request = kwags['request']
            for arg in args:
                if type(arg) == Request:
                    request = arg
                    break
            if not request:
                raise ServerError('No request found!')
            if request['session'].get('user', None):
                return await view_func(*args, **kwargs)
            if self.redirect:
                return login_redirect(request)
            return json({'message': 'access denied'}, status=403)
        return wrapped_view
views.py 文件源码 项目:jawaf 作者: danpozmanter 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def delete(self, request, table_name=None):
        """Delete endpoint.
        :param request: Sanic Request.
        :param table_name: Name of the table to access.
        """
        if not check_csrf(request):
            return json({'message': 'access denied'}, status=403)
        waf = get_jawaf()
        table = registry.get(table_name)
        target_id = request.json.get('id', None)
        if not target_id:
            return json({'message': 'no id'}, status=400)
        if not table:
            return json({'message': 'access denied'}, status=403)
        async with Connection(table['database']) as con:
            stmt = table['table'].delete().where(table['table'].c.id==target_id)
            await con.execute(stmt)
        await add_audit_action('delete', 'admin', table_name, request['session']['user'])
        return json({'message': 'success'}, status=200)
views.py 文件源码 项目:jawaf 作者: danpozmanter 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get(self, request, table_name=None):
        """Get endpoint. Retrieve one object by id (url param `id=`)
        :param request: Sanic Request.
        :param table_name: Name of the table to access.
        """
        waf = get_jawaf()
        table = registry.get(table_name)
        try:
            target_id = int(request.raw_args.get('id', None))
        except:
            return json({'message': 'no id'}, status=400)
        if not table:
            return json({'message': 'access denied'}, status=403)
        async with Connection(table['database']) as con:
            query = sa.select('*').select_from(table['table']).where(table['table'].c.id==target_id)
            result = await con.fetchrow(query)
            if not result:
                return json({'message': 'not found', 'data': None}, status=404)
            return json({'message': 'success', 'data': result}, status=200)
views.py 文件源码 项目:jawaf 作者: danpozmanter 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def patch(self, request, table_name=None):
        """Patch endpoint. Partially edit a row.
        :param request: Sanic Request.
        :param table_name: Name of the table to access.
        """
        if not check_csrf(request):
            return json({'message': 'access denied'}, status=403)
        waf = get_jawaf()
        table = registry.get(table_name)
        target_id = request.json.get('id', None)
        if not target_id:
            return json({'message': 'no id'}, status=400)
        if not table:
            return json({'message': 'access denied'}, status=403)
        request.json.pop('id')
        async with Connection(table['database']) as con:
            stmt = table['table'].update().where(table['table'].c.id==target_id).values(**request.json)
            await con.execute(stmt)
        await add_audit_action('put', 'admin', table_name, request['session']['user'])
        return json({'message': 'success'}, status=200)
views.py 文件源码 项目:jawaf 作者: danpozmanter 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def post(self, request, table_name=None):
        """Post endpoint. Create a new row.
        :param request: Sanic Request.
        :param table_name: Name of the table to access.
        """
        if not check_csrf(request):
            return json({'message': 'access denied'}, status=403)
        waf = get_jawaf()
        table = registry.get(table_name)
        if not table:
            return json({'message': 'access denied'}, status=403)
        async with Connection(table['database']) as con:
            stmt = table['table'].insert().values(**request.json)
            await con.execute(stmt)
        await add_audit_action('post', 'admin', table_name, request['session']['user'])
        return json({'message': 'success'}, status=201)
decorators.py 文件源码 项目:sanic-jwt 作者: ahopkins 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def scoped(scopes, require_all=True, require_all_actions=True):
    def decorator(f):
        @wraps(f)
        async def decorated_function(request, *args, **kwargs):
            # Retrieve the scopes from the payload
            user_scopes = request.app.auth.retrieve_scopes(request)
            if user_scopes is None:
                # If there are no defined scopes in the payload, deny access
                is_authorized = False
            else:
                is_authorized = validate_scopes(request, scopes, user_scopes, require_all, require_all_actions)

            if is_authorized:
                # the user is authorized.
                # run the handler method and return the response
                response = await f(request, *args, **kwargs)
                return response
            else:
                # the user is not authorized.
                return json({
                    'status': 'not_authorized',
                }, 403)
            return response
        return decorated_function
    return decorator
blueprint.py 文件源码 项目:sanic-jwt 作者: ahopkins 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_token_reponse(request, access_token, output, refresh_token=None):
    response = json(output)

    if request.app.config.SANIC_JWT_COOKIE_SET:
        key = request.app.config.SANIC_JWT_COOKIE_TOKEN_NAME
        response.cookies[key] = str(access_token, 'utf-8')
        response.cookies[key]['domain'] = request.app.config.SANIC_JWT_COOKIE_DOMAIN
        response.cookies[key]['httponly'] = request.app.config.SANIC_JWT_COOKIE_HTTPONLY

        if refresh_token and request.app.config.SANIC_JWT_REFRESH_TOKEN_ENABLED:
            key = request.app.config.SANIC_JWT_COOKIE_REFRESH_TOKEN_NAME
            response.cookies[key] = refresh_token
            response.cookies[key]['domain'] = request.app.config.SANIC_JWT_COOKIE_DOMAIN
            response.cookies[key]['httponly'] = request.app.config.SANIC_JWT_COOKIE_HTTPONLY

    return response
api.py 文件源码 项目:homingbot 作者: homingbot 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_messages(id):
    result = {}
    try:
        messages = mail.Message.filter(address=id)
        if (len(messages) == 0):
            raise NotFound('could not find messages for email %s' % ( id))

        result['count'] = len(messages)
        msg_list = []
        for msg in messages:
            msg_list.append(msg.toJson())
        result['messages'] = msg_list
    except (mail.Message.DoesNotExist, KeyError):
        raise NotFound('could not find messages for email %s' % ( id))
    except (mail.Email.DoesNotExist, KeyError):
        raise NotFound('could not find email with address %s' % (id))
    return json(result)
api.py 文件源码 项目:homingbot 作者: homingbot 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get(request):
    '''
    Generate x number of emails
    '''
    count = int(get_post_param(request, 'count'))
    try:
        if count <= 0:
            raise InvalidUsage('count must be greater than 0')
    except KeyError:
        raise InvalidUsage('count must be present')
    live_emails = mail.Email.all()
    logger.debug("Generating %s emails.", count)
    email_gen = utils.generate_emails(count)
    emails = []
    addresses = []
    for n in range(count):
        email = next(email_gen)
        emails.append(email)
        addresses.append(email.address)
    db_service.batch_save(emails)
    return json({'accounts': addresses, "total_active": len(live_emails) + count}, status = 201)
saniconeconnect.py 文件源码 项目:python-tarantool-benchmark-and-bootstrap 作者: valentinmk 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def stickers_api_packs_stickers(self, request, packs, stickers):
        #packs = parse.unquote(packs)
        #stickers = parse.unquote(stickers)
        resp = {"packs": None,
                "stickers": None}
        resp["packs"] = packs
        resp["stickers"] = stickers
        resp.update({"comment": "Get sticker info"})
        resp.update({"name": resp["stickers"]})
        print(resp["stickers"])
        sticker = await self.db.get_stickers_by_name(resp["stickers"])
        print(sticker)
        resp.update({"url": sticker[0][2]})
        resp.update({"id": sticker[0][0]})
        resp.update({"rating": sticker[0][1]})
        resp.update({"pack_url": sticker[0][3]})
        return json(resp)
test_sanic.py 文件源码 项目:covador 作者: baverman 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_form(test_cli):
    resp = yield from test_cli.post('/form/', data={'a': 2})
    assert resp.status == 200
    resp_json = yield from resp.json()
    assert resp_json == {'a': 2}

    body, mct = helpers.encode_multipart([('a', '2')])
    resp = yield from test_cli.post('/form/', data=body,
                                    headers={'Content-Type': mct})
    assert resp.status == 200
    resp_json = yield from resp.json()
    assert resp_json == {'a': 2}

    resp = yield from test_cli.post('/cbv/', data={'a': 2})
    assert resp.status == 200
    resp_json = yield from resp.json()
    assert resp_json == {'a': 2}
test_sanic.py 文件源码 项目:covador 作者: baverman 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_form(test_cli):
    resp = yield from test_cli.post('/form/', data={'a': 2})
    assert resp.status == 200
    resp_json = yield from resp.json()
    assert resp_json == {'a': 2}

    body, mct = helpers.encode_multipart([('a', '2')])
    resp = yield from test_cli.post('/form/', data=body,
                                    headers={'Content-Type': mct})
    assert resp.status == 200
    resp_json = yield from resp.json()
    assert resp_json == {'a': 2}

    resp = yield from test_cli.post('/cbv/', data={'a': 2})
    assert resp.status == 200
    resp_json = yield from resp.json()
    assert resp_json == {'a': 2}
test_sanic.py 文件源码 项目:covador 作者: baverman 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_form(test_cli):
    resp = yield from test_cli.post('/form/', data={'a': 2})
    assert resp.status == 200
    resp_json = yield from resp.json()
    assert resp_json == {'a': 2}

    body, mct = helpers.encode_multipart([('a', '2')])
    resp = yield from test_cli.post('/form/', data=body,
                                    headers={'Content-Type': mct})
    assert resp.status == 200
    resp_json = yield from resp.json()
    assert resp_json == {'a': 2}

    resp = yield from test_cli.post('/cbv/', data={'a': 2})
    assert resp.status == 200
    resp_json = yield from resp.json()
    assert resp_json == {'a': 2}
operate_blueprint.py 文件源码 项目:owllook 作者: howie6879 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def owllook_logout(request):
    """
    ????
    :param request:
    :return:
        :   0   ????
        :   1   ????
    """
    user = request['session'].get('user', None)
    if user:
        response = json({'status': 1})
        del response.cookies['user']
        del response.cookies['owl_sid']
        return response
    else:
        return json({'status': 0})
operate_blueprint.py 文件源码 项目:owllook 作者: howie6879 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def owllook_delete_bookmark(request):
    """
    ????
    :param request:
    :return:
        :   -1  ??session??  ??????
        :   0   ??????
        :   1   ??????
    """
    user = request['session'].get('user', None)
    data = parse_qs(str(request.body, encoding='utf-8'))
    bookmarkurl = data.get('bookmarkurl', '')
    if user and bookmarkurl:
        bookmark = unquote(bookmarkurl[0])
        try:
            motor_db = motor_base.get_db()
            await motor_db.user_message.update_one({'user': user},
                                                   {'$pull': {'bookmarks': {"bookmark": bookmark}}})
            LOGGER.info('??????')
            return json({'status': 1})
        except Exception as e:
            LOGGER.exception(e)
            return json({'status': 0})
    else:
        return json({'status': -1})
operate_blueprint.py 文件源码 项目:owllook 作者: howie6879 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def change_email(request):
    """
    ??????
    :param request:
    :return:
        :   -1  ??session??  ??????
        :   0   ??????
        :   1   ??????
    """
    user = request['session'].get('user', None)
    data = parse_qs(str(request.body, encoding='utf-8'))
    if user:
        try:
            email = data.get('email', None)[0]
            motor_db = motor_base.get_db()
            await motor_db.user.update_one({'user': user},
                                           {'$set': {'email': email}})
            LOGGER.info('??????')
            return json({'status': 1})
        except Exception as e:
            LOGGER.exception(e)
            return json({'status': 0})
    else:
        return json({'status': -1})
authorized_sanic.py 文件源码 项目:sanic 作者: channelcat 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def authorized():
    def decorator(f):
        @wraps(f)
        async def decorated_function(request, *args, **kwargs):
            # run some method that checks the request
            # for the client's authorization status
            is_authorized = check_request_for_authorization_status(request)

            if is_authorized:
                # the user is authorized.
                # run the handler method and return the response
                response = await f(request, *args, **kwargs)
                return response
            else:
                # the user is not authorized.
                return json({'status': 'not_authorized'}, 403)
        return decorated_function
    return decorator
test_utf8.py 文件源码 项目:sanic 作者: channelcat 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_utf8_post_json():
    app = Sanic('test_utf8_post_json')

    @app.route('/')
    async def handler(request):
        return text('OK')

    payload = {'test': '?'}
    headers = {'content-type': 'application/json'}

    request, response = app.test_client.get(
        '/',
        data=json_dumps(payload), headers=headers)

    assert request.json.get('test') == '?'
    assert response.text == 'OK'
test_requests.py 文件源码 项目:sanic 作者: channelcat 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_match_info():
    app = Sanic('test_match_info')

    @app.route('/api/v1/user/<user_id>/')
    async def handler(request, user_id):
        return json(request.match_info)

    request, response = app.test_client.get('/api/v1/user/sanic_user/')

    assert request.match_info == {"user_id": "sanic_user"}
    assert json_loads(response.text) == {"user_id": "sanic_user"}


# ------------------------------------------------------------ #
#  POST
# ------------------------------------------------------------ #
test_response.py 文件源码 项目:sanic 作者: channelcat 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_method_not_allowed():
    app = Sanic('method_not_allowed')

    @app.get('/')
    async def test(request):
        return response.json({'hello': 'world'})

    request, response = app.test_client.head('/')
    assert response.headers['Allow']== 'GET'

    @app.post('/')
    async def test(request):
        return response.json({'hello': 'world'})

    request, response = app.test_client.head('/')
    assert response.status == 405
    assert set(response.headers['Allow'].split(', ')) == set(['GET', 'POST'])
    assert response.headers['Content-Length'] == '0'
test_request_data.py 文件源码 项目:sanic 作者: channelcat 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_app_injection():
    app = Sanic('test_app_injection')
    expected = random.choice(range(0, 100))

    @app.listener('after_server_start')
    async def inject_data(app, loop):
        app.injected = expected

    @app.get('/')
    async def handler(request):
        return json({'injected': request.app.injected})

    request, response = app.test_client.get('/')

    response_json = loads(response.text)
    assert response_json['injected'] == expected
views.py 文件源码 项目:WebGames 作者: Julien00859 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def signin(req):
    if any(map(lambda key: key not in req.json, ["login", "password"])):
        logger.debug(f"Request is {req.json} but some arguments are missing.")
        raise InvalidUsage("Missing argument")

    user = await User.get_by_login(req.json["login"])
    if user is None:
        logger.debug(f"Request is {req.json} but user coundn't be found.")
        raise NotFound("User not found")

    if await accounts.is_frozen(user.id, req.ip):
        logger.debug(f"Request is {req.json} but the account is frozen.")
        raise InvalidUsage("Account frozen")

    if not compare_digest(user.password, User.hashpwd(req.json["password"])):
        logger.debug(f"Request is {req.json} but the password is invalid.")
        unfreeze = await accounts.freeze(user.id, req.ip)
        raise InvalidUsage("Invalid password. Account frozen until " + unfreeze.isoformat(sep=" ", timespec="seconds"))

    await accounts.unfreeze(user.id, req.ip)
    token = await accounts.register(user.id)
    logger.info(f"User {user.name} connected. Token generated: {token}")
    return json({"token": token, "id": user.id, "name": user.name})
views.py 文件源码 项目:WebGames 作者: Julien00859 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def refresh(req):
    if "id" not in req.json or "token" not in req.json:
        logger.debug(f"Request is {req.json} but some arguments are missing.")
        raise InvalidUsage("Missing argument")

    user = User.get_by_id(req.json["id"])
    if user is None:
        logger.debug(f"Request is {req.json} but user coundn't be found.")
        raise NotFound("User not found")

    if await accounts.is_frozen(user.id, req.ip):
        logger.debug(f"Request is {req.json} but the account is frozen.")
        raise InvalidUsage("Account frozen")

    if not await accounts.is_valid(req.json["id"], req.json["token"]):
        logger.debug(f"Request is {req.json} but the token is invalid or expirated.")
        await accounts.freeze(user.id, req.ip)
        raise NotFound("Token not found or expirated")

    await accounts.unfreeze(user.id, req.ip)
    logger.info(f"User {user.name} refreshed it's token: {req.json['token']}")
    return json({"token": req.json["token"], "id": user.id, "name": user.name})


问题


面经


文章

微信
公众号

扫码关注公众号