python类loads()的实例源码

http.py 文件源码 项目:dsq 作者: baverman 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def push(self, environ):
        ct = environ.get('CONTENT_TYPE')
        stream = environ['wsgi.input']
        content = stream.read(int(environ['CONTENT_LENGTH']))
        if ct == 'application/json':
            try:
                task = json.loads(content if PY2 else content.decode('utf-8'))
            except:
                return Error('400 BAD REQUEST', 'invalid-encoding', 'Can\'t decode body')
        elif ct == 'application/x-msgpack':
            try:
                task = msgpack.loads(content, encoding='utf-8')
            except:
                return Error('400 BAD REQUEST', 'invalid-encoding', 'Can\'t decode body')
        else:
            return Error('400 BAD REQUEST', 'invalid-content-type',
                         'Content must be json or msgpack')

        if not task.get('queue'):
            return Error('400 BAD REQUEST', 'bad-params', 'queue required')

        if not task.get('name'):
            return Error('400 BAD REQUEST', 'bad-params', 'name required')

        return {'id': self.manager.push(**task).id}
util.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _get_external_data(url):
    result = {}
    try:
        # urlopen might fail if it runs into redirections,
        # because of Python issue #13696. Fixed in locators
        # using a custom redirect handler.
        resp = urlopen(url)
        headers = resp.info()
        ct = headers.get('Content-Type')
        if not ct.startswith('application/json'):
            logger.debug('Unexpected response for JSON request: %s', ct)
        else:
            reader = codecs.getreader('utf-8')(resp)
            #data = reader.read().decode('utf-8')
            #result = json.loads(data)
            result = json.load(reader)
    except Exception as e:
        logger.exception('Failed to get external data for %s: %s', url, e)
    return result
test_basic.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_pydist():
    """Make sure pydist.json exists and validates against our schema."""
    # XXX this test may need manual cleanup of older wheels

    import jsonschema

    def open_json(filename):
        return json.loads(open(filename, 'rb').read().decode('utf-8'))

    pymeta_schema = open_json(resource_filename('wheel.test',
                                                'pydist-schema.json'))
    valid = 0
    for dist in ("simple.dist", "complex-dist"):
        basedir = pkg_resources.resource_filename('wheel.test', dist)
        for (dirname, subdirs, filenames) in os.walk(basedir):
            for filename in filenames:
                if filename.endswith('.whl'):
                    whl = ZipFile(os.path.join(dirname, filename))
                    for entry in whl.infolist():
                        if entry.filename.endswith('/metadata.json'):
                            pymeta = json.loads(whl.read(entry).decode('utf-8'))
                            jsonschema.validate(pymeta, pymeta_schema)
                            valid += 1
    assert valid > 0, "No metadata.json found"
test_keys.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_load_save(self):
        self.wk.data = json.loads(wheel_json)

        self.wk.add_signer('+', '67890')
        self.wk.add_signer('scope', 'abcdefg')

        self.wk.trust('epocs', 'gfedcba')
        self.wk.trust('+', '12345')

        self.wk.save()

        del self.wk.data
        self.wk.load()

        signers = self.wk.signers('scope')
        self.assertTrue(signers[0] == ('scope', 'abcdefg'), self.wk.data['signers'])
        self.assertTrue(signers[1][0] == '+', self.wk.data['signers'])

        trusted = self.wk.trusted('epocs')
        self.assertTrue(trusted[0] == ('epocs', 'gfedcba'))
        self.assertTrue(trusted[1][0] == '+')

        self.wk.untrust('epocs', 'gfedcba')
        trusted = self.wk.trusted('epocs')
        self.assertTrue(('epocs', 'gfedcba') not in trusted)
data_preprocessing.py 文件源码 项目:AVSR-Deep-Speech 作者: pandeydivesh15 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def read_json_file(file_path):
    '''
    Args:
        1. file_path:   File path for a json file. 
                        File should be similar to the format -
                        https://gist.github.com/pandeydivesh15/2012ab10562cc85e796e1f57554aca33
    Returns:
        data:   A list of dicts. Each dict contains timing info for a spoken word(or punctuation).
    '''

    with open(file_path, 'r') as f:
        data = json.loads(f.read())['words']

        # for line in f:
        #   temp = json.loads(line)
        #   temp['start'] = None if temp['start'] == 'NA' else float(temp['start'])
        #   temp['end'] = None if temp['end'] == 'NA' else float(temp['end'])
        #   try:
        #       temp['word'] = temp['word'].encode('ascii')
        #   except KeyError:
        #       temp['punctuation'] = temp['punctuation'].encode('ascii')               
        #   data.append(temp)

    return data
train.py 文件源码 项目:Structured-Self-Attentive-Sentence-Embedding 作者: ExplorerFreda 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def package(data, volatile=False):
    """Package data for training / evaluation."""
    data = map(lambda x: json.loads(x), data)
    dat = map(lambda x: map(lambda y: dictionary.word2idx[y], x['text']), data)
    maxlen = 0
    for item in dat:
        maxlen = max(maxlen, len(item))
    targets = map(lambda x: x['label'], data)
    maxlen = min(maxlen, 500)
    for i in range(len(data)):
        if maxlen < len(dat[i]):
            dat[i] = dat[i][:maxlen]
        else:
            for j in range(maxlen - len(dat[i])):
                dat[i].append(dictionary.word2idx['<pad>'])
    dat = Variable(torch.LongTensor(dat), volatile=volatile)
    targets = Variable(torch.LongTensor(targets), volatile=volatile)
    return dat.t(), targets
auto_isolate_from_watchlist.py 文件源码 项目:cbapi-examples 作者: cbcommunity 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_watchlist_id_by_name(watchlistsdict):
    """
    For each watchlist name specified in the config file, find the
    associated watchlist ID.
    NOTE: We trigger on watchlist IDs, and not on watchlist names
    """
    global cbtoken
    global cbserver

    headers = {'X-AUTH-TOKEN': cbtoken}

    r = requests.get("https://%s/api/v1/watchlist" % (cbserver),
                      headers=headers,
                      verify=False)

    parsed_json = json.loads(r.text)

    for watchlist in parsed_json:
        for key, value in watchlistsdict.iteritems():
            if watchlist['name'].lower() == key.lower():
                watchlistsdict[key] = watchlist['id']
auto_blacklist_from_watchlist.py 文件源码 项目:cbapi-examples 作者: cbcommunity 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_watchlist_id_by_name(watchlistsdict):
    """
    For each watchlist name specified in the config file, find the
    associated watchlist ID.
    NOTE: We trigger on watchlist IDs, and not on watchlist names
    """

    headers = {'X-AUTH-TOKEN': cbtoken}

    r = requests.get("https://%s/api/v1/watchlist" % (cbserver),
                     headers=headers,
                     verify=False)

    parsed_json = json.loads(r.text)

    for watchlist in parsed_json:
        for key, value in watchlistsdict.iteritems():
            if watchlist['name'].lower() == key.lower():
                watchlistsdict[key] = int(watchlist['id'])
json_format.py 文件源码 项目:ios-xr-grpc-python 作者: cisco-grpc-connection-libs 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def Parse(text, message, ignore_unknown_fields=False):
  """Parses a JSON representation of a protocol message into a message.
  Args:
    text: Message JSON representation.
    message: A protocol buffer message to merge into.
    ignore_unknown_fields: If True, do not raise errors for unknown fields.
  Returns:
    The same message passed as argument.
  Raises::
    ParseError: On JSON parsing problems.
  """
  if not isinstance(text, six.text_type): text = text.decode('utf-8')
  try:
    if sys.version_info < (2, 7):
      # object_pair_hook is not supported before python2.7
      js = json.loads(text)
    else:
      js = json.loads(text, object_pairs_hook=_DuplicateChecker)
  except ValueError as e:
    raise ParseError('Failed to load JSON: {0}.'.format(str(e)))
  return ParseDict(js, message, ignore_unknown_fields)
route_policy.py 文件源码 项目:ios-xr-grpc-python 作者: cisco-grpc-connection-libs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def multiple_policies(self, policies, neighbor):
        """Creates a new policy that applies list of policies to it.
        :param policies: list of policies that you want applied to a single policy
        :param neighbor: the neighbor you are going to apply these policies (used for naming)
        :type policies: list
        :type neighbor: str
        :return:  Name of the policy that is created
        :rtype: str
        """
        policy_name = neighbor.replace('.', '_')
        policy_name = 'multi_policy_' + policy_name
        shell = '{"openconfig-routing-policy:routing-policy": {"policy-definitions": {"policy-definition": [{"name": "%s","statements": {"statement": []}}]}}}' % policy_name
        shell = json.loads(shell, object_pairs_hook=OrderedDict)
        conditions = shell['openconfig-routing-policy:routing-policy']['policy-definitions']['policy-definition'][0]['statements']['statement']
        for policy in policies:
            policy_nm = 'Policy_' + policy
            json_policy = '{"name": "%s", "conditions": {"call-policy": "%s"}}' % (policy_nm, policy)
            json_policy = json.loads(json_policy, object_pairs_hook=OrderedDict)
            conditions.append(json_policy)
        multi_policy = json.dumps(shell)
        print(self.merge_config(multi_policy))
        return policy_name
cookie_tool.py 文件源码 项目:docklet 作者: unias 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def parse_cookie(cookie, securekey):
    logger.info (">> parse cookie : %s" % cookie)
    parts = cookie.split('.')
    part1 = parts[0]
    part2 = '' if len(parts) < 2 else parts[1]
    try:
        text = str(base64.b64decode(part1.encode('ascii')), encoding='utf-8')
    except:
        logger.info ("decode cookie failed")
        return None
    logger.info ("cookie content : %s" % text)
    thatpart2 = hashlib.md5((text+securekey).encode('ascii')).hexdigest()
    logger.info ("hash from part1 : %s" % thatpart2)
    logger.info ("hash from part2 : %s" % part2)
    if part2 == thatpart2:
        result = json.loads(text)['name']
    else:
        result = None
    logger.info ("parse from cookie : %s" % result)
    return result
upgrade.py 文件源码 项目:docklet 作者: unias 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def name_error():
    quotafile = open(fspath+"/global/sys/quotainfo", 'r')
    quotas = json.loads(quotafile.read())
    quotafile.close()
    if quotas['default'] == 'fundation':
        quotas['default'] = 'foundation'
    quotafile = open(fspath+"/global/sys/quotainfo",'w')
    quotafile.write(json.dumps(quotas)) 
    quotafile.close()

    groupfile = open(fspath+"/global/sys/quota", 'r')
    groups = json.loads(groupfile.read())
    groupfile.close()
    for group in groups:
        if group['name'] == 'fundation':
            group['name'] = 'foundation'
    groupfile = open(fspath+"/global/sys/quota",'w')
    groupfile.write(json.dumps(groups)) 
    groupfile.close()

    users = User.query.filter_by(user_group = 'fundation').all()
    for user in users:
        user.user_group = 'foundation'
    db.session.commit()
vclustermgr.py 文件源码 项目:docklet 作者: unias 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def recover_allclusters(self):
        logger.info("recovering all vclusters for all users...")
        usersdir = self.fspath+"/global/users/"
        auth_key = env.getenv('AUTH_KEY')
        res = post_to_user("/master/user/groupinfo/", {'auth_key':auth_key})
        #logger.info(res)
        groups = json.loads(res['groups'])
        quotas = {}
        for group in groups:
            #logger.info(group)
            quotas[group['name']] = group['quotas']
        for user in os.listdir(usersdir):
            for cluster in self.list_clusters(user)[1]:
                logger.info ("recovering cluster:%s for user:%s ..." % (cluster, user))
                #res = post_to_user('/user/uid/',{'username':user,'auth_key':auth_key})
                recover_info = post_to_user("/master/user/recoverinfo/", {'username':user,'auth_key':auth_key})
                uid = recover_info['uid']
                groupname = recover_info['groupname']
                input_rate_limit = quotas[groupname]['input_rate_limit']
                output_rate_limit = quotas[groupname]['output_rate_limit']
                self.recover_cluster(cluster, user, uid, input_rate_limit, output_rate_limit)
        logger.info("recovered all vclusters for all users")
vclustermgr.py 文件源码 项目:docklet 作者: unias 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_clustersetting(self, clustername, username, containername, allcontainer):
        clusterpath = self.fspath + "/global/users/" + username + "/clusters/" + clustername
        if not os.path.isfile(clusterpath):
            logger.error("cluster file: %s not found" % clustername)
            return [False, "cluster file not found"]
        infofile = open(clusterpath, 'r')
        info = json.loads(infofile.read())
        infofile.close()
        cpu = 0
        memory = 0
        disk = 0
        if allcontainer:
            for container in info['containers']:
                if 'setting' in container:
                    cpu += int(container['setting']['cpu'])
                    memory += int(container['setting']['memory'])
                    disk += int(container['setting']['disk'])
        else:
            for container in info['containers']:
                if container['containername'] == containername:
                    if 'setting' in container:
                        cpu += int(container['setting']['cpu'])
                        memory += int(container['setting']['memory'])
                        disk += int(container['setting']['disk'])
        return [True, {'cpu':cpu, 'memory':memory, 'disk':disk}]
container.py 文件源码 项目:docklet 作者: unias 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def diff_containers(self):
        [status, localcontainers] = self.list_containers()
        globalpath = self.fspath+"/global/users/"
        users = os.listdir(globalpath)
        globalcontainers = []
        for user in users:
            clusters = os.listdir(globalpath+user+"/clusters")
            for cluster in clusters:
                clusterfile = open(globalpath+user+"/clusters/"+cluster, 'r')
                clusterinfo = json.loads(clusterfile.read())
                for container in clusterinfo['containers']:
                    if container['host'] == self.addr:
                        globalcontainers.append(container['containername'])
        both = []
        onlylocal = []
        onlyglobal = []
        for container in localcontainers:
            if container in globalcontainers:
                both.append(container)
            else:
                onlylocal.append(container)
        for container in globalcontainers:
            if container not in localcontainers:
                onlyglobal.append(container)
        return [both, onlylocal, onlyglobal]
userManager.py 文件源码 项目:docklet 作者: unias 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def groupList(*args, **kwargs):
        '''
        Usage: list(cur_user = token_from_auth)
        List all groups for an administrator
        '''
        groupfile = open(fspath+"/global/sys/quota",'r')
        groups = json.loads(groupfile.read())
        groupfile.close()
        quotafile = open(fspath+"/global/sys/quotainfo",'r')
        quotas = json.loads(quotafile.read())
        quotafile.close()
        result = {
            "success": 'true',
            "groups": groups,
            "quotas": quotas['quotainfo'],
            "default": quotas['default'],
        }
        return result
userManager.py 文件源码 项目:docklet 作者: unias 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def groupQuery(self, *args, **kwargs):
        '''
        Usage: groupQuery(name = XXX, cur_user = token_from_auth)
        List a group for an administrator
        '''
        groupfile = open(fspath+"/global/sys/quota",'r')
        groups = json.loads(groupfile.read())
        groupfile.close()
        for group in groups:
            if group['name'] == kwargs['name']:
                result = {
                    "success":'true',
                    "data": group['quotas'],
                }
                return result
        else:
            return {"success":False, "reason":"Group does not exist"}
userManager.py 文件源码 项目:docklet 作者: unias 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def quotaadd(*args, **kwargs):
        form = kwargs.get('form')
        quotaname = form.get("quotaname")
        default_value = form.get("default_value")
        hint = form.get("hint")
        if (quotaname == None):
            return { "success":'false', "reason": "Empty quota name"}
        if (default_value == None):
            default_value = "--"
        groupfile = open(fspath+"/global/sys/quota",'r')
        groups = json.loads(groupfile.read())
        groupfile.close()
        for group in groups:
            group['quotas'][quotaname] = default_value
        groupfile = open(fspath+"/global/sys/quota",'w')
        groupfile.write(json.dumps(groups))
        groupfile.close()
        quotafile = open(fspath+"/global/sys/quotainfo",'r')
        quotas = json.loads(quotafile.read())
        quotafile.close()
        quotas['quotainfo'].append({'name':quotaname, 'hint':hint})
        quotafile = open(fspath+"/global/sys/quotainfo",'w')
        quotafile.write(json.dumps(quotas))
        quotafile.close()
        return {"success":'true'}
monitor.py 文件源码 项目:docklet 作者: unias 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_billing_history(vnode_name):
    clusters_dir = env.getenv("FS_PREFIX")+"/global/users/"+get_owner(vnode_name)+"/clusters/"
    if os.path.exists(clusters_dir):
        clusters = os.listdir(clusters_dir)
        for cluster in clusters:
            clusterpath = clusters_dir + cluster
            if not os.path.isfile(clusterpath):
                continue
            infofile = open(clusterpath, 'r')
            info = json.loads(infofile.read())
            infofile.close()
            if 'billing_history' not in info or vnode_name not in info['billing_history']:
                continue
            return info['billing_history'][vnode_name]
    default = {}
    default['cpu'] = 0
    default['mem'] = 0
    default['disk'] = 0
    default['port'] = 0
    return default

# the thread to collect data from each worker and store them in monitor_hosts and monitor_vnodes


问题


面经


文章

微信
公众号

扫码关注公众号