python类loads()的实例源码

client.py 文件源码 项目:public-dns 作者: ssut 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def query(self, hostname, type='A', dnssec=True):
        assert utils.validate_hostname(hostname)
        assert utils.validate_rr_type(type)

        if (type in ('PTR', RR['PTR']) and
            not (hostname.endswith('.in-addr.arpa') or
                 hostname.endswith('.in-addr.arpa.'))):
            hostname = '%s.in-addr.arpa' % (hostname)

        params = self.build_params(hostname, type, dnssec)
        url = '%s?%s' % (self.server, params)
        req = self.session.request('GET', url,
                                   headers=PublicDNS.default_headers)
        resp = self.session.get_response(req)
        if resp.status != 200:
            raise InvalidHTTPStatusCode
        body = resp.read()
        json = load_json(body)
        obj = utils.populate_response(json)
        return obj
request.py 文件源码 项目:kaira 作者: mulonemartin 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def load_body(self):
        """ Load http request body and returns
            form data and files.
        """
        environ = self.environ
        cl = environ['CONTENT_LENGTH']
        icl = int(cl)
        if icl > self.max_content_lenght:
            raise ValueError('Maximum content length exceeded')
        fp = environ['wsgi.input']
        ct = environ['CONTENT_TYPE']
        # application/x-www-form-urlencoded
        if '/x' in ct:
            return parse_qs(fp.read(icl).decode(self.encoding)), None
        # application/json
        elif '/j' in ct:
            return json_loads(fp.read(icl).decode(self.encoding)), None
        # multipart/form-data
        elif ct.startswith('m'):
            return parse_multipart(fp, ct, cl, self.encoding)
        else:
            return None, None
webhookhandler.py 文件源码 项目:mobot 作者: JokerQyou 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def do_POST(self):
        self.logger.debug('Webhook triggered')
        try:
            self._validate_post()
            clen = self._get_content_len()
        except _InvalidPost as e:
            self.send_error(e.http_code)
            self.end_headers()
        else:
            buf = self.rfile.read(clen)
            json_string = bytes_to_native_str(buf)

            self.send_response(200)
            self.end_headers()

            self.logger.debug('Webhook received data: ' + json_string)

            update = Update.de_json(json.loads(json_string), self.server.bot)

            self.logger.debug('Received Update with ID %d on Webhook' % update.update_id)
            self.server.update_queue.put(update)
mongoengine.py 文件源码 项目:falcon-api 作者: Opentopic 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_queryset(self, req, resp):
        query_term = self.get_param_or_post(req, self.PARAM_TEXT_QUERY)
        search = self.get_param_or_post(req, self.PARAM_SEARCH)
        if search:
            try:
                req.params['__raw__'] = json.loads(search)
            except ValueError:
                raise HTTPBadRequest('Invalid attribute',
                                     'Value of {} filter attribute is invalid'.format(self.PARAM_SEARCH))
        order = self.get_param_or_post(req, self.PARAM_ORDER)
        queryset = self.objects_class.objects(**req.params)
        if query_term is not None:
            queryset = queryset.search_text(query_term)
        if order:
            queryset = queryset.order_by(order)
        return queryset
base.py 文件源码 项目:falcon-api 作者: Opentopic 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_param_totals(self, req):
        """
        Gets the totals and total_count params and normalizes them into a single list.

        :param req: Falcon request
        :type req: falcon.request.Request

        :return: total expressions
        :rtype: list
        """
        totals = self.get_param_or_post(req, self.PARAM_TOTALS, [])
        if totals:
            if isinstance(totals, str):
                totals = json.loads(totals)
            if isinstance(totals, dict):
                totals = [totals]
            else:
                totals = list(map(lambda x: x if isinstance(x, dict) else {x: None}, totals))
        total_count = self.get_param_or_post(req, self.PARAM_TOTAL_COUNT)
        if total_count and not list(filter(lambda x: 'count' in x, totals)):
            totals.append({'count': None})
        return totals
test_method_errors.py 文件源码 项目:swaggerit 作者: dutradda 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_integrity_error_handling_with_foreign_key(self, post_method, stream, session):
        stream.feed_data(ujson.dumps([{'m2_id': 1}]).encode())
        stream.feed_eof()
        request = SwaggerRequest('/model1/', 'post', body=stream, headers={'content-type': 'application/json'})
        resp = await post_method(request, session)

        assert resp.status_code == 400
        assert ujson.loads(resp.body) == {
            'params': {'m2_id': 1},
            'database message': {
                'message':  'Cannot add or update a child row: '
                            'a foreign key constraint fails '
                            '(`swaggerit_test`.`model1_swagger`, '
                            'CONSTRAINT `model1_swagger_ibfk_1` FOREIGN '
                            'KEY (`m2_id`) REFERENCES `model2_swagger` '
                            '(`id`))',
                'code': 1452
            }
        }
test_method_errors.py 文件源码 项目:swaggerit 作者: dutradda 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_model_base_error_handling_with_patch_and_with_nested_delete(self, patch_method, post_method, stream, session):
        stream.feed_data(b'[{}]')
        stream.feed_eof()
        request = SwaggerRequest('/model1/1/', 'patch', body=stream, headers={'content-type': 'application/json'})
        await post_method(request, session)

        stream = asyncio.StreamReader(loop=session.loop)
        body = {'model2_': {'id': 1, '_operation': 'delete'}}
        stream.feed_data(ujson.dumps(body).encode())
        stream.feed_eof()
        request = SwaggerRequest('/model1/1/', 'patch', path_params={'id': 1}, body=stream, headers={'content-type': 'application/json'})
        resp = await patch_method(request, session)

        assert resp.status_code == 400
        assert ujson.loads(resp.body) == {
            'instance': [body],
            'message': "Can't execute nested 'delete' operation"
        }
test_method_errors.py 文件源码 项目:swaggerit 作者: dutradda 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_model_base_error_handling_with_patch_and_with_nested_remove(self, patch_method, post_method, stream, session):
        stream.feed_data(b'[{}]')
        stream.feed_eof()
        request = SwaggerRequest('/model1/1/', 'patch', body=stream, headers={'content-type': 'application/json'})
        await post_method(request, session)

        stream = asyncio.StreamReader(loop=session.loop)
        body = {'model2_': {'id': 1, '_operation': 'remove'}}
        stream.feed_data(ujson.dumps(body).encode())
        stream.feed_eof()
        request = SwaggerRequest('/model1/1/', 'patch', path_params={'id': 1}, body=stream, headers={'content-type': 'application/json'})
        resp = await patch_method(request, session)

        assert resp.status_code == 400
        assert ujson.loads(resp.body) == {
            'instance': [body],
            'message': "Can't execute nested 'remove' operation"
        }
test_method_errors.py 文件源码 项目:swaggerit 作者: dutradda 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_model_base_error_handling_with_patch_and_with_nested_update(self, patch_method, post_method, stream, session):
        stream.feed_data(b'[{}]')
        stream.feed_eof()
        request = SwaggerRequest('/model1/1/', 'patch', body=stream, headers={'content-type': 'application/json'})
        await post_method(request, session)

        stream = asyncio.StreamReader(loop=session.loop)
        body = {'model2_': {'id': 1, '_operation': 'update'}}
        stream.feed_data(ujson.dumps(body).encode())
        stream.feed_eof()
        request = SwaggerRequest('/model1/1/', 'patch', path_params={'id': 1}, body=stream, headers={'content-type': 'application/json'})
        resp = await patch_method(request, session)

        assert resp.status_code == 400
        assert ujson.loads(resp.body) == {
            'instance': [body],
            'message': "Can't execute nested 'update' operation"
        }
zcifv2.py 文件源码 项目:csirtg-smrt-py 作者: csirtgadgets 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _post(self, uri, data):
        if type(data) == dict:
            data = [data]

        if type(data[0]) != dict:
            raise RuntimeError('submitted data must be a dictionary')

        data = json.dumps(data)

        if self.nowait:
            uri = "{0}?nowait=1".format(uri)

        logger.debug('uri: %s' % uri)

        body = self.session.post(uri, data=data, verify=self.verify_ssl)
        logger.debug('status code: ' + str(body.status_code))
        if body.status_code > 299:
            logger.error('request failed: %s' % str(body.status_code))
            logger.error(json.loads(body.text).get('message'))
            return None

        body = json.loads(body.text)
        return body
wamp.py 文件源码 项目:zeronet-debian 作者: bashrc 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def on_message(self, message):
        data = json.loads(message)

        if not isinstance(data, list):
            raise Exception('incoming data is no list')

        if data[0] == self.MSG_PREFIX and len(data) == 3:
            prefix, uri = data[1:3]
            self.prefixes.add(prefix, uri)

        elif data[0] == self.MSG_CALL and len(data) >= 3:
            return self.rpc_call(data)

        elif data[0] in (self.MSG_SUBSCRIBE, self.MSG_UNSUBSCRIBE,
                         self.MSG_PUBLISH):
            return self.pubsub_action(data)
        else:
            raise Exception("Unknown call")
handler.py 文件源码 项目:Stock-Visualizer 作者: saguo 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def consume_messages(cls, **kwargs):
        def job(consumer_, redis_client_, redis_channel_):
            for msg in consumer_.poll():
                message = msg.value
                logger.info(ujson.loads(message))
                redis_client_.publish(redis_channel_, message)

        def shut_down(consumer_):
            consumer_.shut_down()

        # get consumer
        kafka_broker = kwargs.get(KAFKA_BROKER) or DEFAULT_KAFKA_BROKER
        kafka_topic = kwargs.get(KAFKA_OUTPUT_TOPIC) or DEFAULT_KAFKA_OUTPUT_TOPIC
        consumer = Consumer(kafka_broker, kafka_topic)

        # get redis
        redis_channel = kwargs.get(REDIS_CHANNEL) or DEFAULT_REDIS_CHANNEL
        redis_host = kwargs.get(REDIS_HOST) or DEFAULT_REDIS_HOST
        redis_port = kwargs.get(REDIS_PORT) or DEFAULT_REDIS_PORT
        redis_client = redis.StrictRedis(host=redis_host, port=redis_port)

        atexit.register(shut_down, consumer)

        scheduler = Scheduler(1, job, consumer, redis_client, redis_channel)
        scheduler.run()
test_request_data.py 文件源码 项目:annotated-py-sanic 作者: hhstore 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_storage():
    app = Sanic('test_text')

    @app.middleware('request')
    def store(request):
        request['user'] = 'sanic'
        request['sidekick'] = 'tails'
        del request['sidekick']

    @app.route('/')
    def handler(request):
        return json({ 'user': request.get('user'), 'sidekick': request.get('sidekick') })

    request, response = sanic_endpoint_test(app)

    response_json = loads(response.text)
    assert response_json['user'] == 'sanic'
    assert response_json.get('sidekick') is None
test_example.py 文件源码 项目:deb-python-falcon 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def process_request(self, req, resp):
        # req.stream corresponds to the WSGI wsgi.input environ variable,
        # and allows you to read bytes from the request body.
        #
        # See also: PEP 3333
        if req.content_length in (None, 0):
            # Nothing to do
            return

        body = req.stream.read()
        if not body:
            raise falcon.HTTPBadRequest('Empty request body',
                                        'A valid JSON document is required.')

        try:
            req.context['doc'] = json.loads(body.decode('utf-8'))

        except (ValueError, UnicodeDecodeError):
            raise falcon.HTTPError(falcon.HTTP_753,
                                   'Malformed JSON',
                                   'Could not decode the request body. The '
                                   'JSON was incorrect or not encoded as '
                                   'UTF-8.')
webserver.py 文件源码 项目:esp32 作者: smeenka 项目源码 文件源码 阅读 69 收藏 0 点赞 0 评论 0
def handleSetOff(self,request):
        path = request.path
        value = json.loads(request.body)
        group     = value["group"]
        onoff     = value["onoff"]
        log.debug ("Group: %s Set motors %s",group,onoff)
        if "L" in group:
            driver = driverL
        if "R" in group:
            driver = driverR

        if onoff == '-1':
            driver.allExit()
        elif onoff == '0':    
            driver.allOff()
        elif onoff == '1':    
            driver.allOn()
        yield from request.sendOk()
docker_api.py 文件源码 项目:dockerscan 作者: cr0hn 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _find_layers(img, id):
    with closing(img.extractfile('%s/json' % id)) as fd:
        f_content = fd.read()
        if hasattr(f_content, "decode"):
            f_content = f_content.decode()
        info = json.loads(f_content)

    log.debug('layer = %s', id)
    for k in ['os', 'architecture', 'author', 'created']:
        if k in info:
            log.debug('%s = %s', k, info[k])

    yield id

    if 'parent' in info:
        pid = info['parent']
        for layer in _find_layers(img, pid):
            yield layer


# --------------------------------------------------------------------------
# Public API
# --------------------------------------------------------------------------
docker_api.py 文件源码 项目:dockerscan 作者: cr0hn 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_root_json_from_image(img: tarfile.TarFile) -> Tuple[str, dict]:
    """
    Every docker image has a root .json file with the metadata information.
    this function locate this file, load it and return the value of it and
    their name

    >>> get_docker_image_layers(img)
    ('db079554b4d2f7c65c4df3adae88cb72d051c8c3b8613eb44e86f60c945b1ca7', dict(...))
    """
    for f in img.getmembers():
        if f.name.endswith("json") and "/" not in f.name:
            c = img.extractfile(f.name).read()
            if hasattr(c, "decode"):
                c = c.decode()

            return f.name.split(".")[0], json.loads(c)

    return None, None
rewarder_client.py 文件源码 项目:universe 作者: openai 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def onMessage(self, payload, isBinary):
        extra_logger.debug('[%s] Received payload: %s', self.factory.label, payload)
        assert not isBinary
        payload = ujson.loads(payload)

        context = self._make_context()
        latency = context['start'] - payload['headers']['sent_at']
        pyprofile.incr('rewarder_protocol.messages')
        pyprofile.incr('rewarder_protocol.messages.{}'.format(payload['method']))

        # Double latency to model RTT
        pyprofile.timing('rewarder_protocol.latency.rtt.skew_unadjusted', 2*latency)
        if latency < 0:
            pyprofile.incr('rewarder_protocol.latency.rtt.skew_unadjusted.negative')

        self.recv(context, payload)
test_benchmarks.py 文件源码 项目:cyrapidjson 作者: thedrow 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_double_performance_float_precision(benchmark):
    print("\nArray with 256 doubles:")
    name = 'rapidjson (precise)'
    serialize = rapidjson.dumps
    deserialize = rapidjson.loads

    ser_data, des_data = benchmark(run_client_test,
                                   name, serialize, deserialize,
                                   data=doubles,
                                   iterations=50000,
                                   )
    msg = "%-11s serialize: %0.3f  deserialize: %0.3f  total: %0.3f" % (
        name, ser_data, des_data, ser_data + des_data
    )

    print(msg)
models.py 文件源码 项目:aurora 作者: carnby 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def from_tweets(cls, tweets, metadata=None, **kwargs):
        """
        :param tweets: a iterable of tweets
        :param kwargs: extra attributes to be considered for inclusion. should be json serializable.
        :return:
        """

        tl = cls()

        json_tweets = json.loads(serializers.serialize("json", tweets));
        for key, values in kwargs.items():
            if len(values) != len(json_tweets):
                continue

            for tweet, value in zip(json_tweets, values):
                tweet['fields'][key] = value

        tl.save()

        json_repr = {'metadata': metadata, 'tweets': json_tweets, 'pk': tl.pk, 'created_at': tl.datetime.isoformat()}
        tl.json = json.dumps(json_repr)
        tl.save()
        return tl
test_healthchecks.py 文件源码 项目:monasca-events-api 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_should_report_healthy_if_kafka_healthy(self, kafka_check):
        kafka_check.healthcheck.return_value = healthcheck.CheckResult(True,
                                                                       'OK')
        self.resource._kafka_check = kafka_check

        ret = self.simulate_request(ENDPOINT,
                                    headers={
                                        'Content-Type': 'application/json'
                                    },
                                    decode='utf8',
                                    method='GET')
        self.assertEqual(falcon.HTTP_OK, self.srmock.status)

        ret = json.loads(ret)
        self.assertIn('kafka', ret)
        self.assertEqual('OK', ret.get('kafka'))
test_healthchecks.py 文件源码 项目:monasca-events-api 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_should_report_unhealthy_if_kafka_unhealthy(self, kafka_check):
        url = 'localhost:8200'
        err_str = 'Could not connect to kafka at %s' % url
        kafka_check.healthcheck.return_value = healthcheck.CheckResult(False,
                                                                       err_str)
        self.resource._kafka_check = kafka_check

        ret = self.simulate_request(ENDPOINT,
                                    headers={
                                        'Content-Type': 'application/json'
                                    },
                                    decode='utf8',
                                    method='GET')
        self.assertEqual(falcon.HTTP_SERVICE_UNAVAILABLE, self.srmock.status)

        ret = json.loads(ret)
        self.assertIn('kafka', ret)
        self.assertEqual(err_str, ret.get('kafka'))
SQLDBClient6.py 文件源码 项目:jumpscale_portal 作者: jumpscale7 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def genericLazyConfig(db, dbKey, serviceName, clusterName):
    if db.exists(dbKey):
        rawConfig = db.get(dbKey)

        try:
            config = json.loads(rawConfig)
        except ValueError as e:
            raise ValueError("Failed to decode SQL DB config %s: %s" % (config, e))

        try:
            host = config["host"]
            database = config["database"]
            user = config["user"]
            password = config["password"]
        except KeyError as e:
            raise KeyError("Missing a required SQL config key in config %s: %s" % (config, e))

        return host, database, user, password
    else:
        raise RuntimeError("No SQL connection configures for "
                           "service %s of cluster %s" % (
                               serviceName, clusterName))
system_contentmanager.py 文件源码 项目:jumpscale_portal 作者: jumpscale7 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def notifySpaceModification(self, id, **args):
        """
        param:id id of space which changed
        result bool 

        """
        id=id.lower()
        loaders = j.core.portal.active.spacesloader
        loader = loaders.getLoaderFromId(id)
        loader.reset()

        ctx=args["ctx"]

        if "payload" in ctx.params:

            payload=ujson.loads(ctx.params["payload"])

            owner=payload["repository"]["owner"]
            name=payload["repository"]["name"]

            cmd="cd /opt/code/%s/%s;hg pull;hg update -C"%(owner,name)
            print("execute %s"%cmd)
            j.system.process.execute(cmd)
1_tableforarg.py 文件源码 项目:jumpscale_portal 作者: jumpscale7 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def main(j, args, params, tags, tasklet):
    params.merge(args)
    doc = params.doc
    data = args.getTag('data')
    title = args.getTag('title')

    out = "*%s*\n" % title
    try:    
        objargs = json.loads(data)
        for key,value in objargs.iteritems():
            if not value:
                value = ''
            out += "|%s|%s|\n"%(str(key),j.html.escape(str(value)))
    except Exception:
        out = ''
    params.result = (out, doc)
    return params
postgres_driver.py 文件源码 项目:python-tarantool-benchmark-and-bootstrap 作者: valentinmk 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def create_random_vote(self):
        """Create secrrets for voting."""
        """This is done for prevent automatic or robotic voting."""
        random_number = random.randrange(0, 100000)
        current_time = int(time.time())
        result = await self.db.fetchrow(
            # 'select * from stickers order by random() limit 1;'
            'SELECT * FROM stickers TABLESAMPLE SYSTEM_ROWS(1);'
        )
        random_sticker = json.loads(result[1])
        token = await self.db.fetchval(
            "select md5('{}');".format(random_number))
        await self.db.fetch(
            "insert into secret (data) values"
            "('{}')".format(json.dumps([
                token,
                current_time,
                random_sticker[0],
                random_sticker[2]
            ])))
        return (random_sticker[2], token)
postgres_driver.py 文件源码 项目:python-tarantool-benchmark-and-bootstrap 作者: valentinmk 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_statistics(self):
        sql = "select data from server"
        server = await self.db.fetchrow(sql)
        server = json.loads(server[0])
        sql = "SELECT count (data) FROM stickers"
        stickers_count = await self.db.fetchval(sql)
        sql = "SELECT count (data) FROM packs"
        packs_count = await self.db.fetchval(sql)
        statistics = {
            "users": server[1],
            "clicks": server[2],
            "votes": server[3],
            "packs_count": packs_count,
            "stickers_count": stickers_count,
        }
        logging.debug('Get statistics: {}'.format(str(statistics)))
        return statistics

    # Bellow functions related to Stickers API and don't using by main site
wamp.py 文件源码 项目:RealtimePythonChat 作者: quangtqag 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def on_message(self, message):
        data = json.loads(message)

        if not isinstance(data, list):
            raise Exception('incoming data is no list')

        if data[0] == self.MSG_PREFIX and len(data) == 3:
            prefix, uri = data[1:3]
            self.prefixes.add(prefix, uri)

        elif data[0] == self.MSG_CALL and len(data) >= 3:
            return self.rpc_call(data)

        elif data[0] in (self.MSG_SUBSCRIBE, self.MSG_UNSUBSCRIBE,
                         self.MSG_PUBLISH):
            return self.pubsub_action(data)
        else:
            raise Exception("Unknown call")
types.py 文件源码 项目:PythonTelegram 作者: YongJang 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def check_json(json_type):
        """
        Checks whether json_type is a dict or a string. If it is already a dict, it is returned as-is.
        If it is not, it is converted to a dict by means of json.loads(json_type)
        :param json_type:
        :return:
        """
        try:
            str_types = (str, unicode)
        except NameError:
            str_types = (str,)

        if type(json_type) == dict:
            return json_type
        elif type(json_type) in str_types:
            return json.loads(json_type)
        else:
            raise ValueError("json_type should be a json dict or string.")
common.py 文件源码 项目:icfpc2016-judge 作者: icfpc2016 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _request(method, path, type, **kwargs):
    url = 'http://localhost:8000%s' % path
    res = method(url, **kwargs)
    res.raise_for_status()
    if type == 'html':
        assert 'text/html' in res.headers['Content-Type']
        doc = bs4.BeautifulSoup(res.text, 'html5lib')
    elif type == 'text':
        assert 'text/plain' in res.headers['Content-Type']
        doc = res.text
    elif type == 'json':
        assert ('application/json' in res.headers['Content-Type'] or
                'text/plain' in res.headers['Content-Type'])
        doc = ujson.loads(res.text)
    else:
        assert False, type
    return (res, doc)


问题


面经


文章

微信
公众号

扫码关注公众号