python类loads()的实例源码

problem.py 文件源码 项目:xgovctf 作者: alphagov 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def insert_problem_from_json(blob):
    """
    Converts json blob of problem(s) into dicts. Runs insert_problem on each one.
    See insert_problem for more information.

    Returns:
        A list of the created problem pids if an array of problems is specified.
    """

    result = json_util.loads(blob)

    if type(result) == list:
        return [insert_problem(problem) for problem in result]
    elif type(result) == dict:
        return insert_problem(result)
    else:
        raise InternalException("JSON blob does not appear to be a list of problems or a single problem.")
module_bson.py 文件源码 项目:dataIO-project 作者: MacHu-GWU 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_all(self):
        data = {
            "int": 100,
            "float": 3.1415926535,
            "str": "string example ?????",
            "bytes": "bytes example ?????".encode("utf-8"),
            "boolean": True,
            "datetime": datetime.now()
        }
        js = json_util.dumps(data)

        data1 = json_util.loads(js)
        self.assertEqual(data["int"], data1["int"])
        self.assertAlmostEqual(data["float"], data1["float"], delta=0.0001)
        self.assertEqual(data["str"], data1["str"])
        self.assertEqual(data["boolean"], data1["boolean"])

        print(data1["bytes"])
        print(data1["datetime"])
        print(json_util.dumps(data, sort_keys=True, indent=4))


#--- Unittest ---
authentication.py 文件源码 项目:pypers 作者: frankosan 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def verify_token(username, token):
    """
    Verify validity of token
    """
    s = TimedJWSSerializer(app.config['SECRET_KEY'])

    try:
        ut.pretty_print("Trying to load the token")
        data = s.loads(token)
    except SignatureExpired:
        ut.pretty_print("ERROR: Expired Token")
        return False
    except BadSignature:
        ut.pretty_print("ERROR: Invalid Token")
        return False
    else:
        ut.pretty_print("Token successfully loaded")
        stored = db.sessions.find_one(filter={'username': data['username']}, sort=[('_id',-1)])

        if not stored:
            return False
        result = json_util.loads(json_util.dumps(stored))

        return pwd_context.verify(data['password'], result['password_hash']) and data['username'] == username
api_manager.py 文件源码 项目:picoCTF 作者: picoCTF 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_json_objects(files):
    objects = []
    for f in files:
        contents = open(f, "r").read()
        data = json_util.loads(contents)
        if isinstance(data, list):
            objects += data
        elif isinstance(data, dict):
            objects.append(data)
        else:
            logging.warning("JSON file {} did not contain an object or list of objects".format(f))
    return objects
json_collection.py 文件源码 项目:smappdragon 作者: SMAPPNYU 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_iterator(self):
        tweet_parser = TweetParser()
        if self.compression == 'bz2':
            self.mode = binary_mode(self.mode)
            json_handle = bz2.open(self.filepath, self.mode, encoding=self.encoding)
        elif self.compression == 'gzip':
            self.mode = binary_mode(self.mode)
            json_handle = gzip.open(self.filepath, self.mode, encoding=self.encoding)
        else:    
            json_handle = open(self.filepath, self.mode, encoding=self.encoding)
        bad_lines = 0
        for count, tweet in enumerate(json_handle):
            if not self.throw_error:
                try:
                    tweet = json_util.loads(tweet)
                except:
                    bad_lines += 1
            else:
                tweet = json_util.loads(tweet)
            if self.limit != 0 and self.limit <= count:
                return
            elif tweet_parser.tweet_passes_filter(self.filter, tweet) \
            and tweet_parser.tweet_passes_custom_filter_list(self.custom_filters, tweet):
                if self.should_strip:
                    yield tweet_parser.strip_tweet(self.keep_fields, tweet)
                else:
                    yield tweet
        if self.verbose:
            print("{} rows are ok.".format(count - bad_lines))
            print("{} rows are corrupt.".format(bad_lines))
        json_handle.close()
rester.py 文件源码 项目:webtzite 作者: materialsproject 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _make_request(self, sub_url, payload=None, method="GET"):
        response = None
        url = self.preamble.replace('8000', '5000') + sub_url
        try:
            headers = {'Referer': self.preamble}
            if self.session.cookies.get('csrftoken') is None:
                from django.core.urlresolvers import reverse
                uri = urlparse.urlparse(self.preamble)
                domain = '{uri.scheme}://{uri.netloc}'.format(uri=uri).replace('8000', '5000')
                site_url = '/'.join(uri.path.split('/')[:-2]) # test_site/
                browserid_csrf = reverse('browserid.csrf')
                if site_url[:-1] not in browserid_csrf:
                    domain += site_url
                domain += browserid_csrf
                self.session.get(domain)
            headers["X-CSRFToken"] = self.session.cookies.get('csrftoken')
            response = self.session.request(method, url=url, headers=headers, data=payload)
            if response.status_code in [200, 400]:
                data = loads(response.text, json_options=JSONOptions(document_class=OrderedDict))
                if data["valid_response"]:
                    if data.get("warning"):
                        warnings.warn(data["warning"])
                    return data["response"]
                else:
                    raise MPResterError(data["error"])
            raise MPResterError(
                "REST query returned with error status code {}"
                .format(response.status_code)
            )
        except Exception as ex:
            msg = "{}. Content: {}".format(str(ex), repr(response.content)) \
                    if hasattr(response, "content") else str(ex)
            raise MPResterError(msg)
api_manager.py 文件源码 项目:picoCTF 作者: royragsdale 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_json_objects(files):
    objects = []
    for f in files:
        contents = open(f, "r").read()
        data = json_util.loads(contents)
        if isinstance(data, list):
            objects += data
        elif isinstance(data, dict):
            objects.append(data)
        else:
            logging.warning("JSON file {} did not contain an object or list of objects".format(f))
    return objects
api_manager.py 文件源码 项目:xgovctf 作者: alphagov 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_json_objects(files):
    objects = []
    for f in files:
        contents = open(f, "r").read()
        data = json_util.loads(contents)
        if isinstance(data, list):
            objects += data
        elif isinstance(data, dict):
            objects.append(data)
        else:
            logging.warning("JSON file {} did not contain an object or list of objects".format(f))
    return objects
autogen.py 文件源码 项目:xgovctf 作者: alphagov 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def read_metadata(pid, n):
    """
    Reads the metadata object for a given problem instance.

    Args:
        pid: the problem id
        n: the problem instance
    Returns:
        The metadata object
    """

    metadata_path = get_metadata_path(pid, n)
    with open(metadata_path, "r") as f:
        return json_util.loads(f.read())
test_bsonjs.py 文件源码 项目:python-bsonjs 作者: mongodb-labs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def bsonjs_loads(json_str):
    """Provide same API as json_util.loads"""
    return to_object(bsonjs.loads(json_str))
test_bsonjs.py 文件源码 项目:python-bsonjs 作者: mongodb-labs 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def round_trip(self, doc):
        bson_bytes = to_bson(doc)
        self.assertEqual(bson_bytes, bsonjs.loads(bsonjs.dumps(bson_bytes)))
        # Check compatibility between bsonjs and json_util
        self.assertEqual(doc, json_util.loads(
            bsonjs.dumps(bson_bytes),
            json_options=json_util.STRICT_JSON_OPTIONS))
        self.assertEqual(bson_bytes, bsonjs.loads(json_util.dumps(
            doc, json_options=json_util.STRICT_JSON_OPTIONS)))
test_bsonjs.py 文件源码 项目:python-bsonjs 作者: mongodb-labs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_binary(self):
        bin_type_dict = {"bin": Binary(b"\x00\x01\x02\x03\x04")}
        md5_type_dict = {
            "md5": Binary(b" n7\x18\xaf\t/\xd1\xd1/\x80\xca\xe7q\xcc\xac",
                          MD5_SUBTYPE)
        }
        custom_type_dict = {"custom": Binary(b"hello", USER_DEFINED_SUBTYPE)}

        self.round_trip(bin_type_dict)
        self.round_trip(md5_type_dict)
        self.round_trip(custom_type_dict)

        json_bin_dump = bsonjs_dumps(md5_type_dict)
        # Order should be $binary then $type.
        self.assertEqual(
            ('{ "md5" : { "$binary" : "IG43GK8JL9HRL4DK53HMrA==", '
             '"$type" : "05" } }'),
            json_bin_dump)

        json_bin_dump = bsonjs_dumps(custom_type_dict)
        self.assertTrue('"$type" : "80"' in json_bin_dump)
        # Check loading invalid binary
        self.assertRaises(ValueError, bsonjs.loads,
                          '{"a": {"$binary": "invalid", "$type": "80"}}')
test_bsonjs.py 文件源码 项目:python-bsonjs 作者: mongodb-labs 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_numberlong(self):
        json_str = '{"weight": {"$numberLong": "4611686018427387904"}}'
        self.round_trip(bsonjs_loads(json_str))
        self.assertEqual(bsonjs_loads(json_str)['weight'],
                         Int64(4611686018427387904))
        # Check loading invalid $numberLong
        self.assertRaises(ValueError, bsonjs.loads,
                          '{"a": {"$numberLong": 1}}')
        self.assertRaises(ValueError, bsonjs.loads,
                          '{"a": {"$numberLong": "not-a-number"}}')
test_bsonjs.py 文件源码 项目:python-bsonjs 作者: mongodb-labs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_load_mongodb_extended_type_at_top_level(self):
        self.assertRaises(ValueError, bsonjs.loads,
                          '{"$numberLong": "42"}')
        self.assertRaises(ValueError, bsonjs.loads,
                          '{"$numberLong": "42", "a": 1}')
        _ = bsonjs.loads('{"a": 1, "$numberLong": "42"}')
test_bsonjs.py 文件源码 项目:python-bsonjs 作者: mongodb-labs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_loads_multiple_json_documents(self):
        json_str = '{ "test" : "me" }'
        self.assertEqual(bsonjs.loads(json_str), bsonjs.loads(json_str + "{}"))
test_bsonjs.py 文件源码 项目:python-bsonjs 作者: mongodb-labs 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_dump_basic(self):
        json_str = '{ "test" : "me" }'
        bson_bytes = bsonjs.loads(json_str)
        filep = StringIO()
        bsonjs.dump(bson_bytes, filep)
        filep.seek(0)
        self.assertEqual(json_str, filep.read())
test_bsonjs.py 文件源码 项目:python-bsonjs 作者: mongodb-labs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_dump_throws_no_write_attribute(self):
        bson_bytes = bsonjs.loads('{ "test" : "me" }')
        not_file = {}
        self.assertRaises(AttributeError, bsonjs.dump, bson_bytes, not_file)
test_bsonjs.py 文件源码 项目:python-bsonjs 作者: mongodb-labs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_load_basic(self):
        json_str = '{ "test" : "me" }'
        filep = StringIO(json_str)
        self.assertEqual(bsonjs.loads(json_str), bsonjs.load(filep))
test_bsonjs.py 文件源码 项目:python-bsonjs 作者: mongodb-labs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_load_unicode(self):
        json_unicode = u'{ "test" : "me" }'

        class UnicodeRead(object):
            def read(self):
                return json_unicode
        self.assertEqual(bsonjs.loads(json_unicode),
                         bsonjs.load(UnicodeRead()))
utils.py 文件源码 项目:django-audit-tools 作者: PeRDy 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def parse_request_meta(meta):
    """Parses request metadata from string or dict.

    :param meta: request.META
    :type meta: str
    :return: Parsed request metadata
    :rtype: dict
    :raise: ValueError
    """
    metadata = meta
    # Escape double quotes
    meta = unicode(metadata)
    meta = meta.replace(r'"', r'\"')
    meta = re.sub(r'\\"(.*?)\'(.*?)\'(.*?)\\"', r"""'\1\\"\2\\"\3'""", meta)
    # Change single quote with double quote
    meta = re.sub(r"u?\'(.*?)\'", r'"\1"', meta)
    # Remove TERMCAP and LS_COLORS fields
    meta = re.sub(r'"TERMCAP": ".*?",\n', "", meta, re.DOTALL)
    meta = re.sub(r'"LS_COLORS": ".*?",\n', "", meta, re.DOTALL)
    # Change tuples () with lists []
    meta = re.sub(r": \((.+?,.*?)\)", r": [\1]", meta)
    # Change objects <> with empty strings ""
    meta = re.sub(r"<(\w+)(.*?)>(,|\})", r'"<\1>"\3', meta)
    # False and True to lowercase
    meta = meta.replace("False", "false")
    meta = meta.replace("True", "true")
    # Change points . with underscores _ in keys
    meta = re.sub(r'(".+?":)(.+?(,|\}))', lambda m: m.group(1).replace(".", "_") + m.group(2), meta)
    # Parse json
    meta_dict = loads(meta)

    return filter_request_meta(meta_dict)
middleware.py 文件源码 项目:django-audit-tools 作者: PeRDy 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _extract_response_data(self, response):
        """Extract response data.

        :param response: Django http response object.
        :type response: django.http.HttpResponse
        :return: Extracted data.
        :rtype: dict
        """
        try:
            content_type = response.get('Content-Type', '')
            ct = content_type.lower()

            if 'json' in ct:
                response_content = loads(response._container[0])
            elif 'xml' in ct:
                response_content = response._container[0].decode('utf-8', errors='ignore')
            else:
                response_content = None
        except:
            response_content = None

        return {
            'content': fix_dict(response_content),
            'type': response.get('Content-Type', ''),
            'status_code': response.status_code,
            }
worker-manager.py 文件源码 项目:worker-manager 作者: nebula-orchestrator 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def rabbit_work_function(ch, method, properties, body):
    try:
        # check the message body to get the needed order
        app_json = loads(body)
        # if it's blank stop containers and kill worker-manger container
        if len(app_json) == 0:
            stop_containers(app_json)
            print "got a blank massage from rabbit - likely app wasn't created in nebula API yet, dropping container"
            os._exit(2)
        # elif it's stopped stop containers
        elif app_json["command"] == "stop":
            stop_containers(app_json)
        # if it's start start containers
        elif app_json["command"] == "start":
            start_containers(app_json, False, registry_auth_user, registry_auth_password, registry_host)
        # if it's roll rolling restart containers
        elif app_json["command"] == "roll":
            roll_containers(app_json, registry_auth_user, registry_auth_password, registry_host)
        # elif restart containers
        else:
            restart_containers(app_json, registry_auth_user, registry_auth_password, registry_host)
        # ack message
        rabbit_ack(ch, method)
    except pika.exceptions.ConnectionClosed as e:
        print >> sys.stderr, e
        print "lost rabbitmq connection mid transfer - dropping container to be on the safe side"
        os._exit(2)


# recursive so it will always keep trying to reconnect to rabbit in case of any connection issues
metadata.py 文件源码 项目:CSH_image_labeling 作者: Paprika69 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def post(self):
        frm = request.form
        condition = {}
        fields = ('width', 'height', 'size')
        # generate query condition
        for field in fields:
            field_condition = self.__get_condition(field, frm)
            if field_condition is not None:
                condition[field] = field_condition

        num = int(frm['num'])
        total = tran.Image.find().count()
        non_text = tran.Image.find({'segmentCharacteristics.segmentType': 'nonText'}).count()
        multi_line = tran.Image.find({'segmentCharacteristics.segmentType': 'multiLine'}).count()
        word = tran.Image.find({'segmentCharacteristics.segmentType': 'word'}).count()
        words = tran.Image.find({'segmentCharacteristics.segmentType': 'words'}).count()
        partial_word = tran.Image.find({'segmentCharacteristics.segmentType': 'partialWord'}).count()
        distinct_word = len(
            tran.Image.find({'segmentCharacteristics.segmentType': 'word'}).distinct('segmentCharacteristics.label'))
        # get matched records
        records = tran.Image.find(condition).limit(num)
        db.close()
        result = {
            "total": total,
            "data": loads(dumps(records)),
            "stat": "total({0}), word({1}), "
                    "distinct_word({6}), partial word({2}), multiple lines({3}), not a word({4}), words({5})"
                .format(total, word, partial_word, multi_line, non_text, words, distinct_word)
        }
        return result, 200
__init__.py 文件源码 项目:pigroom 作者: youjiajia 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def setUpClass(cls):
        Redis.flushall()
        if app.config.get('MONGO_DBNAME') in client.database_names():
            client.drop_database(app.config.get('MONGO_DBNAME'))

        print 'Create database(%s) for unittest ... ok' % (app.config.get('MONGO_DBNAME'))
        kw_path = app.config.get('BASE_DIR') or os.path.abspath(os.path.join(app.root_path, '../'))
        data_path = os.path.join(kw_path, 'wanx/tests/fixtures/')
        for fname in os.listdir(data_path):
            if not fname.startswith('.') and fname.endswith('.json'):
                collection = fname.split('.')[0]
                with open(os.path.join(data_path, fname), 'r') as f:
                    data = f.read()
                    if data:
                        data = bjson.loads(data.strip('\n'))
                        DB.create_collection(collection)
                        DB.get_collection(collection).insert_many(data)

        print 'Create mysql database(%s) for unittest ... ok' % (MYDB.database)
        sql = 'mysql -e "create database if not exists %s  DEFAULT CHARACTER SET utf8 \
               DEFAULT COLLATE utf8_general_ci;"' % (MYDB.database)
        os.popen(sql)
        for fname in os.listdir(data_path):
            if not fname.startswith('.') and fname.endswith('.sql'):
                sql_path = os.path.join(data_path, fname)
                sql = 'mysql %s < %s' % (MYDB.database, sql_path)
                os.popen(sql)
worker-manager.py 文件源码 项目:nebula 作者: nebula-orchestrator 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def rabbit_work_function(ch, method, properties, body):
    try:
        # check the message body to get the needed order
        app_json = loads(body)
        # if it's blank stop containers and kill worker-manger container
        if len(app_json) == 0:
            stop_containers(app_json)
            print "got a blank massage from rabbit - likely app wasn't created in nebula API yet, dropping container"
            os._exit(2)
        # elif it's stopped stop containers
        elif app_json["command"] == "stop":
            stop_containers(app_json)
        # if it's start start containers
        elif app_json["command"] == "start":
            start_containers(app_json, False, registry_auth_user, registry_auth_password, registry_host)
        # if it's roll rolling restart containers
        elif app_json["command"] == "roll":
            roll_containers(app_json, registry_auth_user, registry_auth_password, registry_host)
        # elif restart containers
        else:
            restart_containers(app_json, registry_auth_user, registry_auth_password, registry_host)
        # ack message
        rabbit_ack(ch, method)
    except pika.exceptions.ConnectionClosed:
        print "lost rabbitmq connection mid transfer - dropping container to be on the safe side"
        os._exit(2)
DB_API.py 文件源码 项目:StockRecommendSystem 作者: doncat99 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def writeToCollection(collection, df, id = None):
    jsonStrings = df.to_json(orient='records')
    bsonStrings = json_util.loads(jsonStrings)
    for string in bsonStrings:
        if id is not None:
            id_string = ''.join([string[item] for item in id])
            string['_id'] = id_string
        collection.save(string)
DB_API.py 文件源码 项目:StockRecommendSystem 作者: doncat99 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def writeToCollectionExtend(collection, symbol, df, metadata=None):
    jsonStrings = {"_id":symbol, "symbol":symbol, "data":df.to_json(orient='records'), "metadata":metadata}
    #bsonStrings = json_util.loads(jsonStrings)
    collection.save(jsonStrings)
controllers.py 文件源码 项目:ai-chatbot-framework 作者: alfredfrancis 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def updateStory(storyId):
    jsondata = loads(request.get_data())
    story = Story.objects.get(id=ObjectId(storyId))
    story = update_document(story, jsondata)
    story.save()
    return 'success', 200
base.py 文件源码 项目:flask-zhenai-mongo-echarts 作者: Fretice 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def from_json(self, json_data):
        """Converts json data to unsaved objects"""
        son_data = json_util.loads(json_data)
        return [self._document._from_son(data, only_fields=self.only_fields) for data in son_data]
document.py 文件源码 项目:flask-zhenai-mongo-echarts 作者: Fretice 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def from_json(cls, json_data, created=False):
        """Converts json data to an unsaved document instance"""
        return cls._from_son(json_util.loads(json_data), created=created)


问题


面经


文章

微信
公众号

扫码关注公众号