python类dumps()的实例源码

util.py 文件源码 项目:henet 作者: AcrDijon 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def parse_article(path, cache_dir, root_path):
    if not os.path.exists(cache_dir):
        os.makedirs(cache_dir)

    # XXX two reads...
    with open(path) as f:
        file_md5 = md5(f.read())

    cache = os.path.join(cache_dir, file_md5)

    if os.path.exists(cache):
        with open(cache) as f:
            article = Article(bson.loads(f.read()))
    else:
        article = parse(path)
        article['filename'] = path[len(root_path):].lstrip('/')
        with open(cache, 'w') as f:
            f.write(bson.dumps(article))

    return article
renderers.py 文件源码 项目:baka 作者: baka-framework 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def json_dumps(o):
    return json.dumps(o, cls=JSONEncoder)
renderers.py 文件源码 项目:baka 作者: baka-framework 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def bson_dumps(o):
    return bson.dumps(o)
bson.py 文件源码 项目:serialize 作者: hgrecco 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def dumps(obj):
    if not isinstance(obj, dict):
        obj = dict(__bson_follow__=obj)
    return bson.dumps(all.traverse_and_encode(obj))
socket_queue.py 文件源码 项目:py-bson-rpc 作者: seprich 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, custom_codec_implementation=None):
        if custom_codec_implementation is not None:
            self._loads = custom_codec_implementation.loads
            self._dumps = custom_codec_implementation.dumps
        else:
            # Use implementation from pymongo or from pybson
            import bson
            if hasattr(bson, 'BSON'):
                # pymongo
                self._loads = lambda raw: bson.BSON.decode(bson.BSON(raw))
                self._dumps = lambda msg: bytes(bson.BSON.encode(msg))
            else:
                # pybson
                self._loads = bson.loads
                self._dumps = bson.dumps
socket_queue.py 文件源码 项目:py-bson-rpc 作者: seprich 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def dumps(self, msg):
        try:
            return self._dumps(msg)
        except Exception as e:
            raise EncodingError(e)
socket_queue.py 文件源码 项目:py-bson-rpc 作者: seprich 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, extractor, framer, custom_codec_implementation=None):
        self._extractor = extractor
        self._framer = framer
        if custom_codec_implementation is not None:
            self._loads = custom_codec_implementation.loads
            self._dumps = custom_codec_implementation.dumps
        else:
            import json
            self._loads = json.loads
            self._dumps = json.dumps
socket_queue.py 文件源码 项目:py-bson-rpc 作者: seprich 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def dumps(self, msg):
        try:
            return self._dumps(msg,
                               separators=(',', ':'),
                               sort_keys=True).encode('utf-8')
        except Exception as e:
            raise EncodingError(e)
socket_queue.py 文件源码 项目:py-bson-rpc 作者: seprich 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def put(self, item):
        '''
        Put item to queue -> codec -> socket.

        :param item: Message object.
        :type item: dict, list or None
        '''
        if self._closed:
            raise BsonRpcError('Attempt to put items to closed queue.')
        msg_bytes = self.codec.into_frame(self.codec.dumps(item))
        with self._lock:
            self.socket.sendall(msg_bytes)
socket_queue.py 文件源码 项目:py-bson-rpc 作者: seprich 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def dumps(self, msg):
        try:
            return self._dumps(msg)
        except Exception as e:
            raise EncodingError(e)
socket_queue.py 文件源码 项目:py-bson-rpc 作者: seprich 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, extractor, framer, custom_codec_implementation=None):
        self._extractor = extractor
        self._framer = framer
        if custom_codec_implementation is not None:
            self._loads = custom_codec_implementation.loads
            self._dumps = custom_codec_implementation.dumps
        else:
            import json
            self._loads = json.loads
            self._dumps = json.dumps
socket_queue.py 文件源码 项目:py-bson-rpc 作者: seprich 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def dumps(self, msg):
        try:
            return self._dumps(msg,
                               separators=(',', ':'),
                               sort_keys=True).encode('utf-8')
        except Exception as e:
            raise EncodingError(e)
socket_queue.py 文件源码 项目:py-bson-rpc 作者: seprich 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def put(self, item):
        '''
        Put item to queue -> codec -> socket.

        :param item: Message object.
        :type item: dict, list or None
        '''
        if self._closed:
            raise BsonRpcError('Attempt to put items to closed queue.')
        msg_bytes = self.codec.into_frame(self.codec.dumps(item))
        with self._lock:
            self.socket.sendall(msg_bytes)
dataframe_changes_bson.py 文件源码 项目:pcc 作者: Mondego 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def SerializeToString(self):
        return bson.dumps(self)


问题


面经


文章

微信
公众号

扫码关注公众号