def parse_article(path, cache_dir, root_path):
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)
# XXX two reads...
with open(path) as f:
file_md5 = md5(f.read())
cache = os.path.join(cache_dir, file_md5)
if os.path.exists(cache):
with open(cache) as f:
article = Article(bson.loads(f.read()))
else:
article = parse(path)
article['filename'] = path[len(root_path):].lstrip('/')
with open(cache, 'w') as f:
f.write(bson.dumps(article))
return article
python类loads()的实例源码
def json_loads(s):
return json.loads(s, parse_float=Decimal)
def bson_loads(s):
return bson.loads(s)
def get_details(self):
url = self.interpolate_url()
resp = self.api_client.get(url, op='details')
if resp.status_code == 200:
detail_config = bson.loads(resp.text)
return detail_config
def loads(content):
obj = all.traverse_and_decode(bson.loads(content))
return obj.get('__bson_follow__', obj)
def __init__(self, custom_codec_implementation=None):
if custom_codec_implementation is not None:
self._loads = custom_codec_implementation.loads
self._dumps = custom_codec_implementation.dumps
else:
# Use implementation from pymongo or from pybson
import bson
if hasattr(bson, 'BSON'):
# pymongo
self._loads = lambda raw: bson.BSON.decode(bson.BSON(raw))
self._dumps = lambda msg: bytes(bson.BSON.encode(msg))
else:
# pybson
self._loads = bson.loads
self._dumps = bson.dumps
def loads(self, b_msg):
try:
return self._loads(b_msg)
except Exception as e:
raise DecodingError(e)
def __init__(self, extractor, framer, custom_codec_implementation=None):
self._extractor = extractor
self._framer = framer
if custom_codec_implementation is not None:
self._loads = custom_codec_implementation.loads
self._dumps = custom_codec_implementation.dumps
else:
import json
self._loads = json.loads
self._dumps = json.dumps
def loads(self, b_msg):
try:
return self._loads(b_msg.decode('utf-8'))
except Exception as e:
raise DecodingError(e)
def _to_queue(self, bbuffer):
b_msg, bbuffer = self.codec.extract_message(bbuffer)
while b_msg is not None:
self._queue.put(self.codec.loads(b_msg))
b_msg, bbuffer = self.codec.extract_message(bbuffer)
return bbuffer
def loads(self, b_msg):
try:
return self._loads(b_msg)
except Exception as e:
raise DecodingError(e)
def __init__(self, extractor, framer, custom_codec_implementation=None):
self._extractor = extractor
self._framer = framer
if custom_codec_implementation is not None:
self._loads = custom_codec_implementation.loads
self._dumps = custom_codec_implementation.dumps
else:
import json
self._loads = json.loads
self._dumps = json.dumps
def loads(self, b_msg):
try:
return self._loads(b_msg.decode('utf-8'))
except Exception as e:
raise DecodingError(e)
def _to_queue(self, bbuffer):
b_msg, bbuffer = self.codec.extract_message(bbuffer)
while b_msg is not None:
self._queue.put(self.codec.loads(b_msg))
b_msg, bbuffer = self.codec.extract_message(bbuffer)
return bbuffer
def ParseFromString(self, str_value):
self.ParseFromDict(bson.loads(str_value))