python类loads()的实例源码

utils.py 文件源码 项目:ISB-CGC-pipelines 作者: isb-cgc 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def restartJobs(args, config):  # TODO: reimplement
        pipelineDbUtils = PipelineDbUtils(config)
        pipelineQueueUtils = PipelineQueueUtils('WAIT_Q')

        if args.jobId:
            request = json.loads(pipelineDbUtils.getJobInfo(select=["request"], where={"job_id": args.jobId})[0].request)
            msg = {
                "job_id": args.jobId,
                "request": request
            }
            pipelineQueueUtils.publish(json.dumps(msg))

        if args.preempted:
            preempted = pipelineDbUtils.getJobInfo(select=["job_id", "request"], where={"current_status": "PREEMPTED"})

            for p in preempted:
                msg = {
                    "job_id": p.job_id,
                    "request": json.loads(p.request)
                }
                pipelineQueueUtils.publish(json.dumps(msg))
import_cards.py 文件源码 项目:django-magic-cards 作者: pbaranay 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def fetch_data():
    try:
        r = requests.get(MTG_JSON_URL)
    except requests.ConnectionError:
        r = requests.get(FALLBACK_MTG_JSON_URL)
    with closing(r), zipfile.ZipFile(io.BytesIO(r.content)) as archive:
        unzipped_files = archive.infolist()
        if len(unzipped_files) != 1:
            raise RuntimeError("Found an unexpected number of files in the MTGJSON archive.")
        data = archive.read(archive.infolist()[0])
    decoded_data = data.decode('utf-8')
    sets_data = json.loads(decoded_data)
    return sets_data
get_data.py 文件源码 项目:X-ray-classification 作者: bendidi 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def main():
    for url in url_list :
        try:
            r = requests.get(url)
        except : continue
        tree = html.fromstring(r.text)

        script = tree.xpath('//script[@language="javascript"]/text()')[0]

        json_string = regex.findall(script)[0]
        json_data = json.loads(json_string)

        next_page_url = tree.xpath('//footer/a/@href')

        links = [domain + x['nodeRef'] for x in json_data]
        for link in links:
            extract(link)
test_vm_cpu_allocated_agg.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_transform_specs_json_by_project(self):
        """get transform_specs driver table info."""
        transform_specs_json = """
        {"aggregation_params_map":{
               "aggregation_pipeline":{"source":"streaming",
                                       "usage":"fetch_quantity",
                                       "setters":["rollup_quantity",
                                                  "set_aggregated_metric_name",
                                                  "set_aggregated_period"],
                                       "insert":["prepare_data",
                                                 "insert_data"]},
               "aggregated_metric_name": "vcpus_agg",
               "aggregation_period": "hourly",
               "aggregation_group_by_list": ["host", "metric_id", "tenant_id"],
               "usage_fetch_operation": "latest",
               "setter_rollup_group_by_list": ["tenant_id"],
               "setter_rollup_operation": "sum",

               "dimension_list":["aggregation_period",
                                 "host",
                                 "project_id"]
         },
         "metric_group":"vcpus_project",
         "metric_id":"vcpus_project"}"""
        return [json.loads(transform_specs_json)]
zhihu.py 文件源码 项目:ArticleSpider 作者: mtianyan 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def parse_answer(self, reponse):
        # ??question?answer
        ans_json = json.loads(reponse.text)
        is_end = ans_json["paging"]["is_end"]
        next_url = ans_json["paging"]["next"]

        # ??answer?????
        for answer in ans_json["data"]:
            answer_item = ZhihuAnswerItem()
            answer_item["zhihu_id"] = answer["id"]
            answer_item["url"] = answer["url"]
            answer_item["question_id"] = answer["question"]["id"]
            answer_item["author_id"] = answer["author"]["id"] if "id" in answer["author"] else None
            answer_item["content"] = answer["content"] if "content" in answer else None
            answer_item["parise_num"] = answer["voteup_count"]
            answer_item["comments_num"] = answer["comment_count"]
            answer_item["create_time"] = answer["created_time"]
            answer_item["update_time"] = answer["updated_time"]
            answer_item["crawl_time"] = datetime.datetime.now()

            yield answer_item

        if not is_end:
            yield scrapy.Request(next_url, headers=self.headers, callback=self.parse_answer)
service.py 文件源码 项目:ISB-CGC-pipelines 作者: isb-cgc 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def watchJob(jobId, exchangeName):
        queue = PipelineQueue('PIPELINE_JOB_{j}'.format(j=jobId))
        queue.bindToExchange(exchangeName, jobId)

        while True:
            body, method = queue.get()

            if method:
                body = json.loads(body)

                if body["current_status"] == "SUCCEEDED":
                    return jobId
                else:
                    raise PipelineServiceError("Job {j} has current status {s}!".format(j=jobId, s=body["current_status"]))
            else:
                pass
api.py 文件源码 项目:pyTBA 作者: Thing342 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def tba_get(self, path):
        """Base method for querying the TBA API. Returns the response JSON as a python dict.
        :param path: (str) Request path, without the API address prefix (https://www.thebluealliance.com/api/v2/)
        :return: A dict parsed from the response from the API.
        """

        if self.app_id['X-TBA-App-Id'] == "":
            raise Exception('An API key is required for TBA. Please use set_api_key() to set one.')

        url_str = 'https://www.thebluealliance.com/api/v2/' + path
        r = self.session.get(url_str, headers=self.app_id)
        # print(r.url)
        tba_txt = r.text
        try:
            return json.loads(tba_txt)
        except json.JSONDecodeError:
            print(url_str)
            print(tba_txt)
pg_gw_utils.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_gw_interfaces():
    '''
    Gateway node can have multiple interfaces. This function parses json
    provided in config to get all gateway interfaces for this node.
    '''
    node_interfaces = []
    try:
        all_interfaces = json.loads(config('external-interfaces'))
    except ValueError:
        raise ValueError("Invalid json provided for gateway interfaces")
    hostname = get_unit_hostname()
    if hostname in all_interfaces:
        node_interfaces = all_interfaces[hostname].split(',')
    elif 'DEFAULT' in all_interfaces:
        node_interfaces = all_interfaces['DEFAULT'].split(',')
    for interface in node_interfaces:
        if not interface_exists(interface):
            log('Provided gateway interface %s does not exist'
                % interface)
            raise ValueError('Provided gateway interface does not exist')
    return node_interfaces
unitdata.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def getrange(self, key_prefix, strip=False):
        """
        Get a range of keys starting with a common prefix as a mapping of
        keys to values.

        :param str key_prefix: Common prefix among all keys
        :param bool strip: Optionally strip the common prefix from the key
            names in the returned dict
        :return dict: A (possibly empty) dict of key-value mappings
        """
        self.cursor.execute("select key, data from kv where key like ?",
                            ['%s%%' % key_prefix])
        result = self.cursor.fetchall()

        if not result:
            return {}
        if not strip:
            key_prefix = ''
        return dict([
            (k[len(key_prefix):], json.loads(v)) for k, v in result])
hookenv.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def relation_get(attribute=None, unit=None, rid=None):
    """Get relation information"""
    _args = ['relation-get', '--format=json']
    if rid:
        _args.append('-r')
        _args.append(rid)
    _args.append(attribute or '-')
    if unit:
        _args.append(unit)
    try:
        return json.loads(subprocess.check_output(_args).decode('UTF-8'))
    except ValueError:
        return None
    except CalledProcessError as e:
        if e.returncode == 2:
            return None
        raise
hookenv.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def status_get():
    """Retrieve the previously set juju workload state and message

    If the status-get command is not found then assume this is juju < 1.23 and
    return 'unknown', ""

    """
    cmd = ['status-get', "--format=json", "--include-data"]
    try:
        raw_status = subprocess.check_output(cmd)
    except OSError as e:
        if e.errno == errno.ENOENT:
            return ('unknown', "")
        else:
            raise
    else:
        status = json.loads(raw_status.decode("UTF-8"))
        return (status["status"], status["message"])
ceph.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_cache_mode(service, pool_name):
    """
    Find the current caching mode of the pool_name given.
    :param service: six.string_types. The Ceph user name to run the command under
    :param pool_name: six.string_types
    :return: int or None
    """
    validator(value=service, valid_type=six.string_types)
    validator(value=pool_name, valid_type=six.string_types)
    out = check_output(['ceph', '--id', service, 'osd', 'dump', '--format=json'])
    try:
        osd_json = json.loads(out)
        for pool in osd_json['pools']:
            if pool['pool_name'] == pool_name:
                return pool['cache_mode']
        return None
    except ValueError:
        raise
utils.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def get_rmq_cluster_running_nodes(self, sentry_unit):
        """Parse rabbitmqctl cluster_status output string, return list of
        running rabbitmq cluster nodes.

        :param unit: sentry unit
        :returns: List containing node names of running nodes
        """
        # NOTE(beisner): rabbitmqctl cluster_status output is not
        # json-parsable, do string chop foo, then json.loads that.
        str_stat = self.get_rmq_cluster_status(sentry_unit)
        if 'running_nodes' in str_stat:
            pos_start = str_stat.find("{running_nodes,") + 15
            pos_end = str_stat.find("]},", pos_start) + 1
            str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
            run_nodes = json.loads(str_run_nodes)
            return run_nodes
        else:
            return []
utils.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def run_action(self, unit_sentry, action,
                   _check_output=subprocess.check_output,
                   params=None):
        """Run the named action on a given unit sentry.

        params a dict of parameters to use
        _check_output parameter is used for dependency injection.

        @return action_id.
        """
        unit_id = unit_sentry.info["unit_name"]
        command = ["juju", "action", "do", "--format=json", unit_id, action]
        if params is not None:
            for key, value in params.iteritems():
                command.append("{}={}".format(key, value))
        self.log.info("Running command: %s\n" % " ".join(command))
        output = _check_output(command, universal_newlines=True)
        data = json.loads(output)
        action_id = data[u'Action queued with id']
        return action_id
test_views.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_view_responds_stats_on(self):
        self.get(NODEINFO_DOCUMENT_PATH)
        self.response_200()
        self.assertEqual(
            json.loads(decode_if_bytes(self.last_response.content))["usage"],
            {
                "users": {
                    "total": User.objects.count(),
                    "activeHalfyear": User.objects.filter(last_login__gte=now() - datetime.timedelta(days=180)).count(),
                    "activeMonth": User.objects.filter(last_login__gte=now() - datetime.timedelta(days=30)).count(),
                },
                "localPosts": Content.objects.filter(
                    author__user__isnull=False, content_type=ContentType.CONTENT).count(),
                "localComments": Content.objects.filter(
                    author__user__isnull=False, content_type=ContentType.REPLY).count(),
            }
        )
pull_prod.py 文件源码 项目:ELO-Darts 作者: pwgraham91 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def pull_user_data(session):
    print 'pulling users'
    user_data = requests.get(u"{}{}".format(config.prod_url, 'export/users'))
    loaded_data = json.loads(user_data.text)
    for user_dict in loaded_data:
        user = User(
            id=user_dict['id'],
            name=user_dict['name'],
            email=user_dict['email'],
            admin=user_dict['admin'],
            avatar=user_dict['avatar'],
            active=user_dict['active'],
            created_at=user_dict['created_at'],
            elo=user_dict['elo'],
            wins=user_dict['wins'],
            losses=user_dict['losses']
        )
        session.add(user)
    session.commit()
    print 'done pulling users'
pull_prod.py 文件源码 项目:ELO-Darts 作者: pwgraham91 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def pull_game_data(session):
    print 'pulling games'
    game_data = requests.get(u"{}{}".format(config.prod_url, 'export/games'))
    loaded_data = json.loads(game_data.text)
    for game_dict in loaded_data:
        game = Game(
            id=game_dict['id'],
            created_at=game_dict['created_at'],
            deleted_at=game_dict['deleted_at'],
            winner_id=game_dict['winner_id'],
            winner_elo_score=game_dict['winner_elo_score'],
            loser_id=game_dict['loser_id'],
            loser_elo_score=game_dict['loser_elo_score'],
            submitted_by_id=game_dict['submitted_by_id']
        )
        session.add(game)
    session.commit()
    print 'done pulling games'
generate_ddl.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def generate(self, template_path, source_json_path, output_path):
        print("Generating content at %s with template at %s, using key %s" % (
            output_path, template_path, self.key_name))
        data = []
        with open(source_json_path) as f:
            for line in f:
                json_line = json.loads(line)
                data_line = '(\'%s\',\n\'%s\')' % (
                    json_line[self.key_name], json.dumps(json_line))
                data.append(str(data_line))
        print(data)
        with open(template_path) as f:
            template = f.read()
        with open(output_path, 'w') as write_file:
            write_file.write(template)
            for record in data:
                write_file.write(record)
                write_file.write(',\n')
            write_file.seek(-2, 1)
            write_file.truncate()
            write_file.write(';')
test_vm_cpu_allocated_agg.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_transform_specs_json_by_all(self):
        """get transform_specs driver table info."""
        transform_specs_json = """
        {"aggregation_params_map":{
               "aggregation_pipeline":{"source":"streaming",
                                       "usage":"fetch_quantity",
                                       "setters":["rollup_quantity",
                                                  "set_aggregated_metric_name",
                                                  "set_aggregated_period"],
                                       "insert":["prepare_data",
                                                 "insert_data"]},
               "aggregated_metric_name": "vcpus_agg",
               "aggregation_period": "hourly",
               "aggregation_group_by_list": ["host", "metric_id"],
               "usage_fetch_operation": "latest",
               "setter_rollup_group_by_list": [],
               "setter_rollup_operation": "sum",

               "dimension_list":["aggregation_period",
                                 "host",
                                 "project_id"]
         },
         "metric_group":"vcpus_all",
         "metric_id":"vcpus_all"}"""
        return [json.loads(transform_specs_json)]
test_fetch_quantity_util_agg_second_stage.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_pre_transform_specs_json(self):
        """get pre_transform_specs driver table info."""
        pre_transform_specs = ["""
        {"event_processing_params":{"set_default_zone_to":"1",
                                    "set_default_geolocation_to":"1",
                                    "set_default_region_to":"W"},
         "event_type":"cpu.total_logical_cores",
         "metric_id_list":["cpu_util_all"],
         "required_raw_fields_list":["creation_time"],
         "service_id":"host_metrics"}""", """
        {"event_processing_params":{"set_default_zone_to":"1",
                                    "set_default_geolocation_to":"1",
                                    "set_default_region_to":"W"},
         "event_type":"cpu.idle_perc",
         "metric_id_list":["cpu_util_all"],
         "required_raw_fields_list":["creation_time"],
         "service_id":"host_metrics"}"""]
        pre_transform_specs_json_list = \
            [json.loads(pre_transform_spec)
             for pre_transform_spec in pre_transform_specs]

        return pre_transform_specs_json_list
test_fetch_quantity_agg.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def get_transform_specs_json_invalid_name(self):
        """get transform_specs driver table info."""
        transform_specs_json = """
        {"aggregation_params_map":{
               "aggregation_pipeline":{"source":"streaming",
                                       "usage":"fetch_quantity",
                                       "setters":["rollup_quantity",
                                                  "set_aggregated_metric_name",
                                                  "set_aggregated_period"],
                                       "insert":["prepare_data",
                                                 "insert_data"]},
               "aggregated_metric_name": "&invalidmetricname",
               "aggregation_period": "hourly",
               "aggregation_group_by_list": ["host", "metric_id"],
               "usage_fetch_operation": "sum",
               "setter_rollup_group_by_list": ["host"],
               "setter_rollup_operation": "sum",
               "dimension_list":["aggregation_period",
                                 "host",
                                 "project_id"]
         },
         "metric_group":"mem_total_all",
         "metric_id":"mem_total_all"}"""
        return [json.loads(transform_specs_json)]
test_fetch_quantity_util_agg.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_pre_transform_specs_json(self):
        """get pre_transform_specs driver table info."""
        pre_transform_specs = ["""
        {"event_processing_params":{"set_default_zone_to":"1",
                                    "set_default_geolocation_to":"1",
                                    "set_default_region_to":"W"},
         "event_type":"cpu.total_logical_cores",
         "metric_id_list":["cpu_util_all"],
         "required_raw_fields_list":["creation_time"],
         "service_id":"host_metrics"}""", """
        {"event_processing_params":{"set_default_zone_to":"1",
                                    "set_default_geolocation_to":"1",
                                    "set_default_region_to":"W"},
         "event_type":"cpu.idle_perc",
         "metric_id_list":["cpu_util_all"],
         "required_raw_fields_list":["creation_time"],
         "service_id":"host_metrics"}"""]
        pre_transform_specs_json_list = \
            [json.loads(pre_transform_spec)
             for pre_transform_spec in pre_transform_specs]

        return pre_transform_specs_json_list
test_suite_analyzer.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def test_error_response(self):
        self.assertEqual(self.analyzer.get_param('config.password'), "secret")
        self.assertEqual(self.analyzer.get_param('config.key'), "secret")
        self.assertEqual(self.analyzer.get_param('config.apikey'), "secret")
        self.assertEqual(self.analyzer.get_param('config.api_key'), "secret")

        # Run the error method
        with self.assertRaises(SystemExit):
            self.analyzer.error('Error', True)

        # Get the output
        output = self.analyzer.fpoutput.getvalue().strip()
        json_output = json.loads(output)

        self.assertEqual(json_output['success'], False)
        self.assertEqual(json_output['errorMessage'], 'Error')
        self.assertEqual(json_output['input']['dataType'], 'ip')
        self.assertEqual(json_output['input']['data'], '1.1.1.1')
        self.assertEqual(json_output['input']['config']['password'], 'REMOVED')
        self.assertEqual(json_output['input']['config']['key'], 'REMOVED')
        self.assertEqual(json_output['input']['config']['apikey'], 'REMOVED')
        self.assertEqual(json_output['input']['config']['api_key'], 'REMOVED')
vmrayclient.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def get_sample(self, samplehash):
        """
        Downloads information about a sample using a given hash.

        :param samplehash: hash to search for. Has to be either md5, sha1 or sha256
        :type samplehash: str
        :returns: Dictionary of results
        :rtype: dict
        """
        apiurl = '/rest/sample/'
        if len(samplehash) == 32:  # MD5
            apiurl += 'md5/'
        elif len(samplehash) == 40:  # SHA1
            apiurl += 'sha1/'
        elif len(samplehash) == 64:  # SHA256
            apiurl += 'sha256/'
        else:
            raise UnknownHashTypeError('Sample hash has an unknown length.')

        res = self.session.get(self.url + apiurl + samplehash)
        if res.status_code == 200:
            return json.loads(res.text)
        else:
            raise BadResponseError('Response from VMRay was not HTTP 200.'
                                   ' Responsecode: {}; Text: {}'.format(res.status_code, res.text))
vmrayclient.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def query_job_status(self, submissionid):
        """
        Queries vmray to check id a job was 

        :param submissionid: ID of the job/submission
        :type submissionid: int
        :returns: True if job finished, false if not
        :rtype: bool
        """

        apiurl = '/rest/submission/'
        result = self.session.get('{}{}{}'.format(self.url, apiurl, submissionid))
        if result.status_code == 200:
            submission_info = json.loads(result.text)
            if submission_info.get('data', {}).get('submission_finished', False):  # Or something like that
                return True
        else:
            raise UnknownSubmissionIdError('Submission id seems invalid, response was not HTTP 200.')
        return False
safebrowsing.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __query_safebrowsing(self, search_value, search_type):
        """
        The actual query to safebrowsing api

        :param search_value: value to search for
        :type search_value: str
        :param search_type: 'url' or 'ip'
        :type search_type: str
        :return: Results
        :rtype: str
        """
        return json.loads(
                self.session.post(
                    self.url,
                    json=self.__prepare_body(
                        search_value=search_value,
                        search_type=search_type
                    )
                ).text
            )
hippo.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def run(self):
        data = self.getData()

        value = {
            data: {
                "type": self.data_type
            }
        }
        json_data = json.dumps(value)
        post_data = json_data.encode('utf-8')
        headers = {'Content-Type': 'application/json'}

        try:
            request = urllib2.Request('{}/hippocampe/api/v1.0/{}'.format(self.url, self.service), post_data, headers)
            response = urllib2.urlopen(request)
            report = json.loads(response.read())

            self.report(report)
        except urllib2.HTTPError:
            self.error("Hippocampe: " + str(sys.exc_info()[1]))
        except urllib2.URLError:
            self.error("Hippocampe: service is not available")
        except Exception as e:
            self.unexpectedError(e)
main.py 文件源码 项目:Starfish 作者: BillWang139967 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def logic(data):
    # ????????
    #print data
    f1 = open('/tmp/test.txt','a')
    f1.write(data)
    f1.write('\n')
    f1.close()
    data = json.loads(data)
    print type(data)
    mounts = data["MOUNT"]
    netifaces = data["NET"]
    for netif in netifaces:
        if netif["status"] == "up":
            ip = netif["ip"]
    #print netifaces['ip']
    for mount in mounts:
        Mount_point= mount['path']
        use_rate=mount['used_rate']
        if use_rate > '70':
            send_mail(recvmail_conf['addr'],"mount point"+Mount_point ,used_rate)
    return("OK")
json_rpc.py 文件源码 项目:xr-telemetry-m2m-web 作者: cisco 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _reply(self, json_reply):
        """
        Handle a reply that came in over the transport provided
        """
        if not json_reply.startswith('{'):
            self.sdata.log('Received non-JSON data: "{}"'.format(json_reply))
            return
        reply = json.loads(json_reply, object_pairs_hook=OrderedDict)
        if reply['jsonrpc'] != '2.0' or 'id' not in reply or reply['id'] is None:
            self.sdata.log('Received bad JSON-RPC reply: {}'.format(json_reply))
            if len(self.pending_reply_map) == 1:
                # lucky! can guess a pending reply to kill
                this_id = self.pending_reply_map.keys()[0]
                d = self.pending_reply_map[this_id]
                del self.pending_reply_map[this_id]
                e = JsonRpcException('Bad reply: {}'.format(json_reply))
                d.errback(e)
            return
        this_id = int(reply['id'])
        if 'method' in reply and this_id in self.pending_reply_map:
            self.sdata.log('Got echo of request for {}, ignoring'.format(this_id))
        else:
            d = self.pending_reply_map[this_id]
            del self.pending_reply_map[this_id]
            d.callback(reply)
kcdu.py 文件源码 项目:kcdu 作者: learhy 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def create_cd(col_name, type, display_name):
    url = 'https://api.kentik.com/api/v5/customdimension'
    json_template = '''
    {
        "name": "{{ column }}",
        "type": "{{ data_type }}",
        "display_name": "{{ pretty_name }}"
    }
    '''
    t = Template(json_template)
    data = json.loads(t.render(column = col_name, data_type = type, pretty_name = display_name))    
    response = requests.post(url, headers=headers, data=data)
    if response.status_code != 201:
        print("Unable to create custom dimension column. Exiting.")
        print("Status code: {}").format(response.status_code)
        print("Error message: {}").format(response.json()['error'])
        exit()
    else:
        print("Custom dimension \"{}\" created as id: {}").format(display_name, \
            response.json()['customDimension']['id'])
        return(response.json()['customDimension']['id'])


问题


面经


文章

微信
公众号

扫码关注公众号