python类loads()的实例源码

__init__.py 文件源码 项目:mixpanel_api 作者: mixpanel 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def query_jql(self, script, params=None):
        """Query the Mixpanel JQL API

        https://mixpanel.com/help/reference/jql/api-reference#api/access

        :param script: String containing a JQL script to run
        :param params: Optional dict that will be made available to the script as the params global variable.
        :type script: str
        :type params: dict

        """
        query_params = {"script": script}
        if params is not None:
            query_params["params"] = json.dumps(params)

        response = self.request(Mixpanel.FORMATTED_API, ['jql'], query_params, method='POST')
        return json.loads(response)
__init__.py 文件源码 项目:mixpanel_api 作者: mixpanel 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def _get_engage_page(self, params):
        """Fetches and returns the response from an /engage request

        :param params: Query parameters for the /engage API
        :type params: dict
        :return: /engage API response object
        :rtype: dict

        """
        response = self.request(Mixpanel.FORMATTED_API, ['engage'], params)
        data = json.loads(response)
        if 'results' in data:
            return data
        else:
            Mixpanel.LOGGER.warning("Invalid response from /engage: " + response)
            return
tfidf.py 文件源码 项目:newsgraph 作者: exchez 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def calculate_query_bin_bits(tfidf): #this also needs to return the table from redis as well as the bin id
    table = str2int( ujson.loads( r.get('table') ) )
    dim = int( r.get('dim') )
    mapping = ujson.loads( r.get('map') )
    mapping = pd.DataFrame({'word': mapping})
    num_vectors = 16

    words = list(tfidf.keys())
    values = list(tfidf.values())
    tfidf_df = pd.DataFrame({'word': words, 'value': values})

    article_representation = pd.merge(mapping, tfidf_df, on='word', how='left').fillna(0)['value']

    bin_vectors = generate_random_vectors(num_vectors, dim)
    powers_of_two = 1 << np.arange(num_vectors-1, -1, -1)
    query_bin_bits = (article_representation.dot(bin_vectors) >= 0)

    return query_bin_bits, table
module_ujson.py 文件源码 项目:dataIO-project 作者: MacHu-GWU 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_all(self):
        data = {
            "int": 100,
            "float": 3.1415926535,
            "str": "string example ?????",
            "bytes": "bytes example ?????".encode("utf-8"),
            "boolean": True,
            "datetime": datetime.now()
        }
        js = ujson.dumps(data)

        self.assertEqual(data["int"], ujson.loads(js)["int"])
        self.assertAlmostEqual(data["float"], ujson.loads(js)[
                               "float"], delta=0.0001)
        self.assertEqual(data["str"], ujson.loads(js)["str"])
        self.assertNotEqual(data["bytes"], ujson.loads(js)["bytes"])  # ???
        self.assertEqual(data["boolean"], ujson.loads(js)["boolean"])
        self.assertNotEqual(data["datetime"], ujson.loads(js)["datetime"])

        print(ujson.dumps(data, indent=4))


#--- Unittest ---
devtools.py 文件源码 项目:wptagent 作者: WPO-Foundation 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def process_target_event(self, event, msg):
        """Process Target.* dev tools events"""
        if event == 'attachedToTarget':
            if 'targetInfo' in msg['params'] and 'targetId' in msg['params']['targetInfo']:
                target = msg['params']['targetInfo']
                if 'type' in target and target['type'] == 'service_worker':
                    self.workers.append(target)
                    if self.recording:
                        self.send_command('Network.enable', {}, target_id=target['targetId'])
                self.send_command('Runtime.runIfWaitingForDebugger', {},
                                  target_id=target['targetId'])
        if event == 'receivedMessageFromTarget':
            if 'message' in msg['params'] and 'targetId' in msg['params']:
                logging.debug(msg['params']['message'][:200])
                target_id = msg['params']['targetId']
                target_message = json.loads(msg['params']['message'])
                self.process_message(target_message, target_id=target_id)
desktop_browser.py 文件源码 项目:wptagent 作者: WPO-Foundation 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def process_pcap(self):
        """Process the pcap in a background thread"""
        pcap_file = self.pcap_file + '.gz'
        if os.path.isfile(pcap_file):
            path_base = os.path.join(self.task['dir'], self.task['prefix'])
            slices_file = path_base + '_pcap_slices.json.gz'
            pcap_parser = os.path.join(os.path.abspath(os.path.dirname(__file__)),
                                       'support', "pcap-parser.py")
            cmd = ['python', pcap_parser, '--json', '-i', pcap_file, '-d', slices_file]
            logging.debug(cmd)
            try:
                stdout = subprocess.check_output(cmd)
                if stdout is not None:
                    result = json.loads(stdout)
                    if result:
                        if 'in' in result:
                            self.task['page_data']['pcapBytesIn'] = result['in']
                        if 'out' in result:
                            self.task['page_data']['pcapBytesOut'] = result['out']
                        if 'in_dup' in result:
                            self.task['page_data']['pcapBytesInDup'] = result['in_dup']
            except Exception:
                pass
message_server.py 文件源码 项目:wptagent 作者: WPO-Foundation 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def post(self):
        """Handle POST messages"""
        import ujson as json
        try:
            messages = self.request.body
            if messages is not None and len(messages):
                if self.request.uri == '/log':
                    logging.debug(messages)
                else:
                    for line in messages.splitlines():
                        line = line.strip()
                        if len(line):
                            message = json.loads(line)
                            if 'body' not in message and self.request.uri != '/etw':
                                message['body'] = None
                            MESSAGE_SERVER.handle_message(message)
        except Exception:
            pass
        self.set_status(200)
mc_tasks.py 文件源码 项目:mediachain-indexer 作者: mediachain 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def _inner_iter_compactsplit(line, skip_callback):
    #print ('CSLINE',line[:40])
    #t0 = time()
    new_id, dd = line.strip('\n').split('\t', 1)

    rec = u_loads(dd)

    if skip_callback is not False:
        ## Return False to skip:
        sk = skip_callback(rec['_id'])
        if sk is False:
            ## todo re-add fastforward
            return False

        got_any = True

        #print ('INNER',(time() - t0) * 1000, 'ms')

        #yield sk, rec
        return sk, rec
    else:
        #print ('INNER',(time() - t0) * 1000, 'ms')
        #yield rec
        return rec
db_pool.py 文件源码 项目:crawler 作者: dragonflylxp 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def Cache(redis_key, expire=15 * 60, use_cache=True):
    """??????
    """
    def deco(func):
        def wrapper(self, *args, **kwargs):
            if len(args) >= 1:
                REDIS_KEY = redis_key.format(*args)
            else:
                REDIS_KEY = redis_key
            redis_conn = get_redis("main")
            cache_str = redis_conn.get(REDIS_KEY)
            if cache_str and use_cache and config_cache:
                cache = ujson.loads(cache_str)
                return cache

            cache = func(self, *args, **kwargs)
            cache_str = ujson.dumps(cache)
            redis_conn.set(REDIS_KEY, cache_str, expire)

            return cache
        return wrapper
    return deco
test_request_data.py 文件源码 项目:sanic 作者: channelcat 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_storage():
    app = Sanic('test_text')

    @app.middleware('request')
    def store(request):
        request['user'] = 'sanic'
        request['sidekick'] = 'tails'
        del request['sidekick']

    @app.route('/')
    def handler(request):
        return json({'user': request.get('user'), 'sidekick': request.get('sidekick')})

    request, response = app.test_client.get('/')

    response_json = loads(response.text)
    assert response_json['user'] == 'sanic'
    assert response_json.get('sidekick') is None
test_request_data.py 文件源码 项目:sanic 作者: channelcat 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_app_injection():
    app = Sanic('test_app_injection')
    expected = random.choice(range(0, 100))

    @app.listener('after_server_start')
    async def inject_data(app, loop):
        app.injected = expected

    @app.get('/')
    async def handler(request):
        return json({'injected': request.app.injected})

    request, response = app.test_client.get('/')

    response_json = loads(response.text)
    assert response_json['injected'] == expected
wamp.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def on_message(self, message):
        data = json.loads(message)

        if not isinstance(data, list):
            raise Exception('incoming data is no list')

        if data[0] == self.MSG_PREFIX and len(data) == 3:
            prefix, uri = data[1:3]
            self.prefixes.add(prefix, uri)

        elif data[0] == self.MSG_CALL and len(data) >= 3:
            return self.rpc_call(data)

        elif data[0] in (self.MSG_SUBSCRIBE, self.MSG_UNSUBSCRIBE,
                         self.MSG_PUBLISH):
            return self.pubsub_action(data)
        else:
            raise Exception("Unknown call")
test_views.py 文件源码 项目:docsbox 作者: dveselov 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_get_task_by_valid_uuid(self):
        filename = os.path.join(self.samples, "sample.docx")
        response = self.submit_file(filename, {
            "formats": ["txt"]
        })
        json = ujson.loads(response.data)
        self.assertEqual(response.status_code, 200)
        self.assertTrue(json.get("id"))
        self.assertEqual(json.get("status"), "queued")
        response = self.client.get("/api/v1/{0}".format(json.get("id")))
        self.assertEqual(response.status_code, 200)
        self.assertEqual(ujson.loads(response.data), {
            "id": json.get("id"),
            "status": "queued",
            "result_url": None,
        })
request.py 文件源码 项目:kaira 作者: mulonemartin 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def json_loads(data):
            # on Python 3.5 json.loads only supports str not bytes
            return json.loads(data.decode())
http_client.py 文件源码 项目:Mk3-Firmware 作者: emfcamp 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def json(self):
        return ujson.loads(self.text)

    # Writes content into a file. This function will write while receiving, which avoids
    # having to load all content into memory
utils.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def deserialize(value):
        return loads(value)
utils.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def deserialize(value):
        return loads(value, object_hook=json_util.object_hook)
parsers.py 文件源码 项目:anapioficeandfire-python 作者: joakimskoog 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def parse(self, method, data):
        try:
            json_data = json.loads(data)
            return json_data
        except Exception:
            raise AnApiOfIceAndFireError('Failed to parse JSON data')
request.py 文件源码 项目:mobot 作者: JokerQyou 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _parse(json_data):
        """Try and parse the JSON returned from Telegram.

        Returns:
            dict: A JSON parsed as Python dict with results - on error this dict will be empty.

        """
        decoded_s = json_data.decode('utf-8')
        try:
            data = json.loads(decoded_s)
        except ValueError:
            raise TelegramError('Invalid server response')

        if not data.get('ok'):
            description = data.get('description')
            parameters = data.get('parameters')
            if parameters:
                migrate_to_chat_id = parameters.get('migrate_to_chat_id')
                if migrate_to_chat_id:
                    raise ChatMigrated(migrate_to_chat_id)
                retry_after = parameters.get('retry_after')
                if retry_after:
                    raise RetryAfter(retry_after)
            if description:
                return description

        return data['result']
orm.py 文件源码 项目:girlfriend 作者: chihongze 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _create_engine(self, config, section, all_pool_config):
        config_items = config[section]
        pool_policy = config[section]["pool_policy"]
        kws = {}
        if pool_policy is not None:
            pool_config = all_pool_config[pool_policy]
            if pool_config["poolclass"] != NullPool:
                kws = pool_config
        return create_engine(
            config_items["connect_url"],
            encoding=config_items["encoding"],
            connect_args=ujson.loads(config_items["connect_args"]),
            ** kws
        )


问题


面经


文章

微信
公众号

扫码关注公众号