python类loads()的实例源码

util.py 文件源码 项目:henet 作者: AcrDijon 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def parse_article(path, cache_dir, root_path):
    if not os.path.exists(cache_dir):
        os.makedirs(cache_dir)

    # XXX two reads...
    with open(path) as f:
        file_md5 = md5(f.read())

    cache = os.path.join(cache_dir, file_md5)

    if os.path.exists(cache):
        with open(cache) as f:
            article = Article(bson.loads(f.read()))
    else:
        article = parse(path)
        article['filename'] = path[len(root_path):].lstrip('/')
        with open(cache, 'w') as f:
            f.write(bson.dumps(article))

    return article
renderers.py 文件源码 项目:baka 作者: baka-framework 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def json_loads(s):
    return json.loads(s, parse_float=Decimal)
renderers.py 文件源码 项目:baka 作者: baka-framework 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def bson_loads(s):
    return bson.loads(s)
machine.py 文件源码 项目:drydock 作者: att-comdev 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_details(self):
        url = self.interpolate_url()

        resp = self.api_client.get(url, op='details')

        if resp.status_code == 200:
            detail_config = bson.loads(resp.text)
            return detail_config
bson.py 文件源码 项目:serialize 作者: hgrecco 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def loads(content):
    obj = all.traverse_and_decode(bson.loads(content))
    return obj.get('__bson_follow__', obj)
socket_queue.py 文件源码 项目:py-bson-rpc 作者: seprich 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, custom_codec_implementation=None):
        if custom_codec_implementation is not None:
            self._loads = custom_codec_implementation.loads
            self._dumps = custom_codec_implementation.dumps
        else:
            # Use implementation from pymongo or from pybson
            import bson
            if hasattr(bson, 'BSON'):
                # pymongo
                self._loads = lambda raw: bson.BSON.decode(bson.BSON(raw))
                self._dumps = lambda msg: bytes(bson.BSON.encode(msg))
            else:
                # pybson
                self._loads = bson.loads
                self._dumps = bson.dumps
socket_queue.py 文件源码 项目:py-bson-rpc 作者: seprich 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def loads(self, b_msg):
        try:
            return self._loads(b_msg)
        except Exception as e:
            raise DecodingError(e)
socket_queue.py 文件源码 项目:py-bson-rpc 作者: seprich 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, extractor, framer, custom_codec_implementation=None):
        self._extractor = extractor
        self._framer = framer
        if custom_codec_implementation is not None:
            self._loads = custom_codec_implementation.loads
            self._dumps = custom_codec_implementation.dumps
        else:
            import json
            self._loads = json.loads
            self._dumps = json.dumps
socket_queue.py 文件源码 项目:py-bson-rpc 作者: seprich 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def loads(self, b_msg):
        try:
            return self._loads(b_msg.decode('utf-8'))
        except Exception as e:
            raise DecodingError(e)
socket_queue.py 文件源码 项目:py-bson-rpc 作者: seprich 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _to_queue(self, bbuffer):
        b_msg, bbuffer = self.codec.extract_message(bbuffer)
        while b_msg is not None:
            self._queue.put(self.codec.loads(b_msg))
            b_msg, bbuffer = self.codec.extract_message(bbuffer)
        return bbuffer
socket_queue.py 文件源码 项目:py-bson-rpc 作者: seprich 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def loads(self, b_msg):
        try:
            return self._loads(b_msg)
        except Exception as e:
            raise DecodingError(e)
socket_queue.py 文件源码 项目:py-bson-rpc 作者: seprich 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, extractor, framer, custom_codec_implementation=None):
        self._extractor = extractor
        self._framer = framer
        if custom_codec_implementation is not None:
            self._loads = custom_codec_implementation.loads
            self._dumps = custom_codec_implementation.dumps
        else:
            import json
            self._loads = json.loads
            self._dumps = json.dumps
socket_queue.py 文件源码 项目:py-bson-rpc 作者: seprich 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def loads(self, b_msg):
        try:
            return self._loads(b_msg.decode('utf-8'))
        except Exception as e:
            raise DecodingError(e)
socket_queue.py 文件源码 项目:py-bson-rpc 作者: seprich 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _to_queue(self, bbuffer):
        b_msg, bbuffer = self.codec.extract_message(bbuffer)
        while b_msg is not None:
            self._queue.put(self.codec.loads(b_msg))
            b_msg, bbuffer = self.codec.extract_message(bbuffer)
        return bbuffer
dataframe_changes_bson.py 文件源码 项目:pcc 作者: Mondego 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def ParseFromString(self, str_value):
        self.ParseFromDict(bson.loads(str_value))


问题


面经


文章

微信
公众号

扫码关注公众号