python类json_response()的实例源码

api.py 文件源码 项目:mblog 作者: moling3650 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def register(*, name, email, sha1_pw, oid=None, image=None):
    check_string(name=name)
    check_email_and_password(email, sha1_pw)
    users = await User.findAll('email = ?', [email])
    if users:
        raise APIValueError('email', 'Email is already in used.')
    user = User(name=name.strip(), email=email, password=sha1_pw, image=image or '/static/img/user.png')
    await user.save()
    if oid:
        o = Oauth(id=oid, user_id=user.id)
        await o.save()
    # register ok, signin
    return user.signin(web.json_response({'signin user': user.name}))


# ????
view.py 文件源码 项目:irisett 作者: beebyte 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def post(self) -> None:
        request_data = await self.request.json()
        args = require_dict(request_data['args'], str, None)
        if request_data.get('use_monitor_def_name', False):
            monitor_def = get_monitor_def_by_name(
                self.request.app['active_monitor_manager'],
                require_str(request_data['monitor_def']))
        else:
            monitor_def = self.request.app['active_monitor_manager'].monitor_defs.get(
                require_int(request_data['monitor_def']))
        if not monitor_def:
            raise errors.InvalidData('Monitor def not found')
        monitor = await create_active_monitor(self.request.app['active_monitor_manager'], args, monitor_def)
        if not monitor:
            raise errors.InvalidData('invalid monitor arguments')
        return web.json_response(monitor.id)
view.py 文件源码 项目:irisett 作者: beebyte 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get(self) -> web.Response:
        dbcon = self.request.app['dbcon']
        if 'id' in self.request.rel_url.query:
            contact_id = require_int(get_request_param(self.request, 'id'))
            c = await contact.get_contact(dbcon, contact_id)
            contact_list = []  # type: Iterable[object_models.Contact]
            if c:
                contact_list = [c]
            metadata_list = await metadata.get_metadata_for_object(dbcon, 'contact', contact_id)
        elif 'meta_key' in self.request.rel_url.query:
            meta_key = require_str(get_request_param(self.request, 'meta_key'))
            meta_value = require_str(get_request_param(self.request, 'meta_value'))
            contact_list = await contact.get_contacts_for_metadata(dbcon, meta_key, meta_value)
            metadata_list = await metadata.get_metadata_for_object_metadata(
                dbcon, meta_key, meta_value, 'contact', 'contacts')
        else:
            contact_list = await contact.get_all_contacts(dbcon)
            metadata_list = await metadata.get_metadata_for_object_type(dbcon, 'contact')
        return web.json_response(apply_metadata_to_model_list(contact_list, metadata_list))
view.py 文件源码 项目:irisett 作者: beebyte 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get(self) -> web.Response:
        dbcon = self.request.app['dbcon']
        if 'id' in self.request.rel_url.query:
            contact_group_id = require_int(get_request_param(self.request, 'id'))
            contact_group_item = await contact.get_contact_group(dbcon, contact_group_id)
            contact_group_list = []  # type: Iterable[object_models.ContactGroup]
            if contact_group_item:
                contact_group_list = [contact_group_item]
            metadata_list = await metadata.get_metadata_for_object(dbcon, 'contact_group', contact_group_id)
        elif 'meta_key' in self.request.rel_url.query:
            meta_key = require_str(get_request_param(self.request, 'meta_key'))
            meta_value = require_str(get_request_param(self.request, 'meta_value'))
            contact_group_list = await contact.get_contact_groups_for_metadata(dbcon, meta_key, meta_value)
            metadata_list = await metadata.get_metadata_for_object_metadata(
                dbcon, meta_key, meta_value, 'contact_group', 'contact_groups')
        else:
            contact_group_list = await contact.get_all_contact_groups(dbcon)
            metadata_list = await metadata.get_metadata_for_object_type(dbcon, 'monitor_group')
        return web.json_response(apply_metadata_to_model_list(contact_group_list, metadata_list))
view.py 文件源码 项目:irisett 作者: beebyte 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get(self) -> web.Response:
        dbcon = self.request.app['dbcon']
        if 'id' in self.request.rel_url.query:
            monitor_group_id = require_int(get_request_param(self.request, 'id'))
            monitor_group_item = await monitor_group.get_monitor_group(dbcon, monitor_group_id)
            monitor_group_list = []  # type: Iterable[object_models.MonitorGroup]
            if monitor_group_item:
                monitor_group_list = [monitor_group_item]
            metadata_list = await metadata.get_metadata_for_object(dbcon, 'monitor_group', monitor_group_id)
        elif 'meta_key' in self.request.rel_url.query:
            meta_key = require_str(get_request_param(self.request, 'meta_key'))
            meta_value = require_str(get_request_param(self.request, 'meta_value'))
            monitor_group_list = await monitor_group.get_monitor_groups_for_metadata(dbcon, meta_key, meta_value)
            metadata_list = await metadata.get_metadata_for_object_metadata(
                dbcon, meta_key, meta_value, 'monitor_group', 'monitor_groups')
        else:
            monitor_group_list = await monitor_group.get_all_monitor_groups(dbcon)
            metadata_list = await metadata.get_metadata_for_object_type(dbcon, 'monitor_group')
        return web.json_response(apply_metadata_to_model_list(monitor_group_list, metadata_list))
rest.py 文件源码 项目:Telegram-Music-Bot 作者: rexx0520 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def search(self, request):
        text = request.GET.get("text")
        offset = int(request.GET.get("offset", 0))
        limit = int(request.GET.get("limit", 10))

        cursor = text_search(text) if text else db.tracks.find({})
        total = await cursor.count()
        results = await cursor.skip(offset).limit(limit).to_list(limit)
        for r in results:
            del r["_id"]

        return web.json_response({
            "tracks": results,
            "offset": offset,
            "limit": limit,
            "total": total
        })
keystone.py 文件源码 项目:picasso 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def auth_through_token(app: web.Application, handler):
    async def middleware_handler(request: web.Request):
        headers = request.headers
        x_auth_token = headers.get("X-Auth-Token")
        project_id = request.match_info.get('project_id')
        c = config.Config.config_instance()
        try:
            auth = identity.Token(c.auth_url,
                                  token=x_auth_token,
                                  project_id=project_id)
            sess = session.Session(auth=auth)
            ks = client.Client(session=sess,
                               project_id=project_id)
            ks.authenticate(token=x_auth_token)
        except Exception as ex:
            return web.json_response(status=401, data={
                "error": {
                    "message": ("Not authorized. Reason: {}"
                                .format(str(ex)))
                }
            })
        return await handler(request)
    return middleware_handler
test_client.py 文件源码 项目:rauc-hawkbit 作者: rauc 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def hello(request):
    data = {
        "config" : {
            "polling" : {
                "sleep" : "12:00:00"
            }
        },
        "_links" : {
            "deploymentBase" : {
                "href" : "https://rollouts-cs.apps.bosch-iot-cloud.com/TENANT_ID/controller/v1/CONTROLLER_ID/deploymentBase/3?c=-2129030598"
            },
            "configData" : {
                "href" : "https://rollouts-cs.apps.bosch-iot-cloud.com/TENANT_ID/controller/v1/CONTROLLER_ID/configData"
            }
        }
    }
    return web.json_response(data)
utilities.py 文件源码 项目:PollBot 作者: mozilla 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def heartbeat(request):
    info = await asyncio.gather(archives.heartbeat(),
                                balrog.heartbeat(),
                                bedrock.heartbeat(),
                                bouncer.heartbeat(),
                                buildhub.heartbeat(),
                                crash_stats.heartbeat(),
                                product_details.heartbeat(),
                                telemetry.heartbeat())
    status = all(info) and 200 or 503
    return web.json_response({"archive": info[0],
                              "balrog": info[1],
                              "bedrock": info[2],
                              "bouncer": info[3],
                              "buildhub": info[4],
                              "crash-stats": info[5],
                              "product-details": info[6],
                              "telemetry": info[7]},
                             status=status)
decorators.py 文件源码 项目:PollBot 作者: mozilla 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def validate_product_version(func):
    async def decorate(request):
        product = request.match_info['product']
        version = request.match_info.get('version')

        if product not in PRODUCTS:
            return web.json_response({
                'status': 404,
                'message': 'Invalid product: {} not in {}'.format(product, PRODUCTS)
            }, status=404)

        if version and not is_valid_version(version):
            return web.json_response({
                'status': 404,
                'message': 'Invalid version number: {}'.format(version)
            }, status=404)

        if version:
            return await func(request, product, version)

        return await func(request, product)

    return decorate
release.py 文件源码 项目:PollBot 作者: mozilla 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def status_response(task):
    @validate_product_version
    async def wrapped(request, product, version):
        try:
            response = await task(product, version)
        except Exception as e:  # In case something went bad, we return an error status message
            logger.exception(e)
            body = {
                'status': 'error',
                'message': str(e)
            }
            if hasattr(e, 'url') and e.url is not None:
                body['link'] = e.url

            return web.json_response(body)
        return web.json_response(response)
    return wrapped
server.py 文件源码 项目:py-evm 作者: ethereum 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def handle(self, request):
        body = await request.json()
        req_id = body['id']
        method = body['method']
        hash_or_number, _ = body['params']
        if method == 'eth_getBlockByNumber':
            if hash_or_number == "latest":
                head = self.chain.get_canonical_head()
                number = head.block_number
            else:
                number = int(hash_or_number, 16)
            block = await self.chain.get_canonical_block_by_number(number)
        elif method == 'eth_getBlockByHash':
            block_hash = decode_hex(hash_or_number)
            block = await self.chain.get_block_by_hash(block_hash)
        else:
            raise HTTPMethodNotAllowed(method, self.allowed_methods)

        block_dict = self._block_to_dict(block)
        response = {"jsonrpc": "2.0", "id": req_id, "result": block_dict}
        return web.json_response(response)
handlers.py 文件源码 项目:Pirus 作者: REGOVAR 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def rest_error(message:str="Unknow", code:str="0", error_id:str=""):
    """ 
        Build the REST error response
        :param message:         The short "friendly user" error message
        :param code:            The code of the error type
        :param error_id:        The id of the error, to return to the end-user. 
                                This code will allow admins to find in logs where exactly this error occure
    """
    results = {
        "success":      False, 
        "msg":          message, 
        "error_code":   code, 
        "error_url":    code,
        "error_id":     error_id
    }
    return web.json_response(results)
test_storage_http.py 文件源码 项目:aioworkers 作者: aioworkers 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_set_get(loop, test_client):
    app = web.Application()
    app.router.add_get(
        '/test/1',
        lambda x: web.json_response(["Python"]))
    client = await test_client(app)
    url = client.make_url('/')

    data = 'Python'
    config = MergeDict(
        storage=MergeDict(
            cls='aioworkers.storage.http.Storage',
            prefix=str(url),
            semaphore=1,
            format='json',
        ),
    )
    async with Context(config=config, loop=loop) as context:
        storage = context.storage
        assert data in await storage.get('test/1')
        with pytest.raises(StorageError):
            await storage.set('test/1', data)
resources_aiotransmission.py 文件源码 项目:stig 作者: rndusr 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _make_response(self, request, response):
        if callable(response):
            if asyncio.iscoroutinefunction(response):
                return await response(request)
            else:
                return response(request)
        elif isinstance(response, dict):
            return web.json_response(response)

        rqdata = await request.json()
        if 'method' in rqdata and rqdata['method'] == 'session-get':
            return web.json_response(SESSION_GET_RESPONSE)
        elif response is None:
            raise RuntimeError('Set the response property before making a request!')
        else:
            return web.Response(text=response)
rpc_test.py 文件源码 项目:stig 作者: rndusr 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_timeout_plus_one(self):
        delay = self.client.timeout+1

        # NOTE: This function is only called sometimes, probably depending on
        # which task finishes first, advance() or client.connect().
        async def delay_response(request):
            await asyncio.sleep(delay, loop=self.loop)
            return web.json_response(rsrc.SESSION_GET_RESPONSE)
        self.daemon.response = delay_response

        with self.assertRaises(ConnectionError) as cm:
            await asyncio.gather(self.advance(delay),
                                 self.client.connect(),
                                 loop=self.loop)
        self.assertIn('timeout', str(cm.exception).lower())
        self.assertIn(str(self.client.timeout), str(cm.exception))
        self.assertIn(str(self.url), str(cm.exception))

        self.assert_cb_connected_called(calls=0)
        self.assert_cb_disconnected_called(calls=0)
        self.assert_cb_error_called(calls=1,
                                    args=[(self.url,)],
                                    kwargs=[{'error': r'{}.*{}'.format(self.client.timeout,
                                                                       self.url)}])
web.py 文件源码 项目:livebridge 作者: dpa-newslab 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def login(self, request):
        try:
            assert self.config["auth"]["user"]
            assert self.config["auth"]["password"]
        except AssertionError:
            logger.error("HTTP Auth credentials are missing!")
            return web.json_response({"error": "Auth credentials are missing."}, status=400)

        params = await request.post()
        user = params.get('username', None)
        if (user == self.config["auth"]["user"] and
                params.get('password', None) == self.config["auth"]["password"]):
            # User is in our database, remember their login details
            _tokens[user] = str(uuid.uuid4())
            return web.json_response({"token": _tokens[user]})
        return web.json_response({"error": "Unauthorized"}, status=401)
new_book.py 文件源码 项目:event-driven-microservice 作者: Lawouach 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def bookshelf_new(request):
    """
    Add the given book to the shelf event lake.
    """
    payload = await request.content.read()

    # ensure the book gets an id
    book = json.loads(payload.decode('utf-8'))
    book["id"] = str(uuid.uuid4())

    # create an event from this request
    event = make_event(name="book-added",
                       payload=json.dumps(book),
                       safe=False, idempotent=False)

    # let's push it
    await send_event(b"bookshelf", event)

    return web.json_response(status=201, data=book)
integration_server.py 文件源码 项目:bravado-asyncio 作者: sjaensch 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def upload_pet_image(request):
    with open(os.path.join(os.path.dirname(__file__), 'sample.jpg'), 'rb') as f:
        data = await request.post()
        file_data = data.get('file')
        content = file_data.file.read()
        expected_content = f.read()

    if content != expected_content:
        return web.HTTPBadRequest()

    if not (
        request.match_info['petId'] == '42'
        and data.get('userId') == '12'
    ):
        return web.HTTPBadRequest()

    return web.json_response({})
web.py 文件源码 项目:fireq 作者: superdesk 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def hook(request):
    body = await request.read()
    check_signature = hmac.compare_digest(
        get_signature(body),
        request.headers.get('X-Hub-Signature', '')
    )
    if not check_signature:
        return web.HTTPBadRequest()

    body = await request.json()
    headers = dict(request.headers.items())
    del headers['X-Hub-Signature']
    ref = get_hook_ctx(headers, body, clean=True)
    if ref:
        request.app.loop.create_task(ci(ref))
    return web.json_response(ref)
sunucu.py 文件源码 项目:milisia-tox 作者: milisarge 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def dps_baglan(request):
    link=""
    global tox
    durum=""
    data = yield from request.post()
    toxid = data['kdugum']
    print (toxid)
    port =33999 
    lport=random.randrange(38000,40000)
    komut="./tuntox -i "+str(toxid)+" -L "+str(lport)+":127.0.0.1:"+str(port)
    print ("dugumler aras? tunel ac?l?yor.")
    #tunel id kaydetmek için-?u an iptal
    #open("yenidugum","w").write(toxid)
    durum=yield from komutar(komut)
    link=lokalhost+":"+str(lport)
    return web.json_response(data=link)
views.py 文件源码 项目:Domain-Expiration 作者: Palen 项目源码 文件源码 阅读 181 收藏 0 点赞 0 评论 0
def _post_(self, request):
        data = await request.post()
        domain = data.get('domain', "")
        domain_expiration = DomainExpiration(domain=domain)

        if not domain_expiration.name:
            response = {"response": "KO", "message": self.EMPTY_DOMAIN}
        elif not domain_expiration.allowed:
            response = {"response": "KO", "message": self.ALLOWED}

        elif self.db.domains.find_one({"name": domain_expiration.name}):
            response = {"response": "KO", "message": self.EXISTS}

        else:
            domain_expiration.save()
            response = {"response": "OK"}

        return web.json_response(response)
start.py 文件源码 项目:iwant-bot 作者: kiwicom 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def handle_slack_button(request):
    payload = multidict_to_dict(await request.post())
    body = json.loads(payload['payload'])
    print(f'INFO: Button request body:\n{body}.')

    try:
        verify_request_token(body)
    except (KeyError, TokenError) as err:
        print(f'INFO: Invalid token: {err}')
        return web.json_response({'text': 'Unverified message.'})

    if body['actions'][0]['name'] == 'Cancel':
        if 'text' not in body:
            body['text'] = ''
        if 'user_id' not in body:
            body['user_id'] = body['user']['id']
        iwant_object = IwantRequest(body, (), (), _slack_user_pattern)
        iwant_object.cancel_iwant_task()

    return web.json_response({'text': 'Request was cancelled.'})
server.py 文件源码 项目:yQuant 作者: yoonbae81 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def handler(request):
    try:
        print(f'Requested: {request.path_qs}')
        _, symbol, price, volume = request.path_qs.split('/')
    except ValueError:
        return web.Response(status=500)

    t0 = time.time()            # for profiling
    SHARED_MEMORY[symbol].append(price)
    strength = STRATEGY_FUNC(SHARED_MEMORY, symbol, price, volume)
    elapsed_time = time.time() - t0  # for profiling

    # Send a signal
    print(f'Analyzed {symbol} => {strength} ({elapsed_time:.4f})')

    result = {
        'symbol': symbol,
        'price': price,
        'strategy': ARGS['strategy'],
        'strength': strength,
        'stoploss': int(price) - 3000,
        'elapsedTime': elapsed_time
    }

    return web.json_response(result)
response.py 文件源码 项目:Windless 作者: chiaki64 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def geass(context, request, tmpl=None, *, app_key=APP_KEY, encoding='utf-8', status=200):
    # print('path->', request.url)
    if tmpl is None:
        return web.json_response(context)
    try:
        if 'identifier' in context:
            context['PAGE_IDENTIFIER'] = request.app.router[context['identifier']].url()
            # Auth
        context['drawer_category'] = CONST.CATEGORY
    except:
        raise RuntimeError

    response = render_template(tmpl, request, context,
                               app_key=app_key, encoding=encoding)
    response.set_status(status)
    return response
webhook_agent.py 文件源码 项目:python-zentropi 作者: zentropi 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def webhook_emit(self, request):
        name = request.match_info.get('name', None)
        token = request.match_info.get('token', None)
        post_data = await request.post()
        if not name:
            if 'name' not in request.GET or 'name' not in post_data:
                return web.json_response({'success': False, 'message': 'Error: required parameter "name" not found.'})
        if not token and not any(['token' not in request.GET, 'X-Hub-Signature' not in request.headers]):
            return web.json_response({'success': False, 'message': 'Error: required parameter "token" not found.'})
        if not name:
            name = request.GET.get('name', None) or post_data['name']
        if not token:
            token = request.GET.get('token', None)
        if token != TOKEN is False:
            return web.json_response({'success': False, 'message': 'Error: authentication failed. Invalid token.'})
        data = {k: v for k, v in request.GET.items() if k not in ['name', 'token']}
        if post_data:
            data.update({k: v for k, v in post_data.items() if k not in ['name', 'token']})
        self.emit(name, data=data)
        return web.json_response({'success': True})
statistics.py 文件源码 项目:FogLAMP 作者: foglamp 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_statistics(request):
    """
    Args:
        request:

    Returns:
            a general set of statistics

    :Example:
            curl -X GET http://localhost:8081/foglamp/statistics
    """
    payload = PayloadBuilder().SELECT(("key", "description", "value")).ORDER_BY(["key"]).payload()
    storage_client = connect.get_storage()
    results = storage_client.query_tbl_with_payload('statistics', payload)

    return web.json_response(results['rows'])
configuration.py 文件源码 项目:FogLAMP 作者: foglamp 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_categories(request):
    """
    Args:
         request:

    Returns:
            the list of known categories in the configuration database

    :Example:
            curl -X GET http://localhost:8081/foglamp/categories
    """
    # TODO: make it optimized and elegant
    cf_mgr = ConfigurationManager(connect.get_storage())
    categories = await cf_mgr.get_all_category_names()
    categories_json = [{"key": c[0], "description": c[1]} for c in categories]

    return web.json_response({'categories': categories_json})
configuration.py 文件源码 项目:FogLAMP 作者: foglamp 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_category(request):
    """
    Args:
         request: category_name is required

    Returns:
            the configuration items in the given category.

    :Example:
            curl -X GET http://localhost:8081/category/PURGE_READ
    """
    category_name = request.match_info.get('category_name', None)

    if not category_name:
        raise web.HTTPBadRequest(reason="Category Name is required")

    # TODO: make it optimized and elegant
    cf_mgr = ConfigurationManager(connect.get_storage())
    category = await cf_mgr.get_category_all_items(category_name)

    if category is None:
        raise web.HTTPNotFound(reason="No such Category Found for {}".format(category_name))

    return web.json_response(category)
configuration.py 文件源码 项目:FogLAMP 作者: foglamp 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_category_item(request):
    """
    Args:
         request: category_name & config_item are required

    Returns:
            the configuration item in the given category.

    :Example:
            curl -X GET http://localhost:8081/foglamp/category/PURGE_READ/age
    """
    category_name = request.match_info.get('category_name', None)
    config_item = request.match_info.get('config_item', None)

    if not category_name or not config_item:
        raise web.HTTPBadRequest(reason="Both Category Name and Config items are required")

    # TODO: make it optimized and elegant
    cf_mgr = ConfigurationManager(connect.get_storage())
    category_item = await cf_mgr.get_category_item(category_name, config_item)

    if category_item is None:
        raise web.HTTPNotFound(reason="No Category Item Found")

    return web.json_response(category_item)


问题


面经


文章

微信
公众号

扫码关注公众号