def test_pickle(self):
# ticket #135
import pickle
tz11 = FixedOffsetTimezone(60)
tz12 = FixedOffsetTimezone(120)
for proto in [-1, 0, 1, 2]:
tz21, tz22 = pickle.loads(pickle.dumps([tz11, tz12], proto))
self.assertEqual(tz11, tz21)
self.assertEqual(tz12, tz22)
tz11 = FixedOffsetTimezone(60, name='foo')
tz12 = FixedOffsetTimezone(120, name='bar')
for proto in [-1, 0, 1, 2]:
tz21, tz22 = pickle.loads(pickle.dumps([tz11, tz12], proto))
self.assertEqual(tz11, tz21)
self.assertEqual(tz12, tz22)
python类loads()的实例源码
def test_no_conn_curs(self):
from psycopg2._json import _get_json_oids
oid, array_oid = _get_json_oids(self.conn)
old = psycopg2.extensions.string_types.get(114)
olda = psycopg2.extensions.string_types.get(199)
def loads(s):
return psycopg2.extras.json.loads(s, parse_float=Decimal)
try:
new, newa = psycopg2.extras.register_json(
loads=loads, oid=oid, array_oid=array_oid)
curs = self.conn.cursor()
curs.execute("""select '{"a": 100.0, "b": null}'::json""")
data = curs.fetchone()[0]
self.assert_(isinstance(data['a'], Decimal))
self.assertEqual(data['a'], Decimal('100.0'))
finally:
psycopg2.extensions.string_types.pop(new.values[0])
psycopg2.extensions.string_types.pop(newa.values[0])
if old:
psycopg2.extensions.register_type(old)
if olda:
psycopg2.extensions.register_type(olda)
def test_register_default(self):
curs = self.conn.cursor()
def loads(s):
return psycopg2.extras.json.loads(s, parse_float=Decimal)
psycopg2.extras.register_default_json(curs, loads=loads)
curs.execute("""select '{"a": 100.0, "b": null}'::json""")
data = curs.fetchone()[0]
self.assert_(isinstance(data['a'], Decimal))
self.assertEqual(data['a'], Decimal('100.0'))
curs.execute("""select array['{"a": 100.0, "b": null}']::json[]""")
data = curs.fetchone()[0]
self.assert_(isinstance(data[0]['a'], Decimal))
self.assertEqual(data[0]['a'], Decimal('100.0'))
def test_loads(self):
json = psycopg2.extras.json
def loads(s):
return json.loads(s, parse_float=Decimal)
psycopg2.extras.register_json(self.conn, loads=loads, name='jsonb')
curs = self.conn.cursor()
curs.execute("""select '{"a": 100.0, "b": null}'::jsonb""")
data = curs.fetchone()[0]
self.assert_(isinstance(data['a'], Decimal))
self.assertEqual(data['a'], Decimal('100.0'))
# sure we are not manling json too?
curs.execute("""select '{"a": 100.0, "b": null}'::json""")
data = curs.fetchone()[0]
self.assert_(isinstance(data['a'], float))
self.assertEqual(data['a'], 100.0)
def test_register_default(self):
curs = self.conn.cursor()
def loads(s):
return psycopg2.extras.json.loads(s, parse_float=Decimal)
psycopg2.extras.register_default_jsonb(curs, loads=loads)
curs.execute("""select '{"a": 100.0, "b": null}'::jsonb""")
data = curs.fetchone()[0]
self.assert_(isinstance(data['a'], Decimal))
self.assertEqual(data['a'], Decimal('100.0'))
curs.execute("""select array['{"a": 100.0, "b": null}']::jsonb[]""")
data = curs.fetchone()[0]
self.assert_(isinstance(data[0]['a'], Decimal))
self.assertEqual(data[0]['a'], Decimal('100.0'))
def get(self, key, default=None, version=None, acquire_lock=True):
key = self.make_key(key, version=version)
self.validate_key(key)
pickled = None
with (self._lock.reader() if acquire_lock else dummy()):
if not self._has_expired(key):
pickled = self._cache[key]
if pickled is not None:
try:
return pickle.loads(pickled)
except pickle.PickleError:
return default
with (self._lock.writer() if acquire_lock else dummy()):
try:
del self._cache[key]
del self._expire_info[key]
except KeyError:
pass
return default
def load_object(self, value):
"""The reversal of :meth:`dump_object`. This might be called with
None.
"""
if value is None:
return None
if value.startswith(b'!'):
try:
return pickle.loads(value[1:])
except pickle.PickleError:
return None
try:
return int(value)
except ValueError:
# before 0.8 we did not have serialization. Still support that.
return value
def recv_json(self, flags=0, **kwargs):
"""receive a Python object as a message using json to serialize
Keyword arguments are passed on to json.loads
Parameters
----------
flags : int
Any valid recv flag.
Returns
-------
obj : Python object
The Python object that arrives as a message.
"""
from zmq.utils import jsonapi
msg = self.recv(flags)
return jsonapi.loads(msg, **kwargs)
def deserialize(pkl):
return pickle.loads(pkl)
def get_package(self, owner, package):
'''
returns
{
'template' : str(pickle.loads(query[0])),
'example' : str(pickle.loads(query[1]))
}
'''
raise NotImplementedError()
def get_package(self, owner, package):
query = "SELECT template, example FROM public.contracts \
WHERE owner=? AND package=? ALLOW FILTERING"
fetched = self.prepare_execute_return(query, (owner, package))
if not self.exists(fetched):
return False
return {
'template' : str(pickle.loads(fetched[0][0])),
'example' : str(pickle.loads(fetched[0][1]))
}
def get_package(self, owner, package):
query = self.connection.execute("SELECT template, example FROM packages WHERE owner=? AND package=?", (owner, package)).fetchone()
if query == None:
return False
return {
'template' : str(pickle.loads(query[0])),
'example' : str(pickle.loads(query[1]))
}
def getUserprops(self):
"""
Get all user's properties
return list of dict if all done
return [] there are not any user's properties yet
return False if something wrong
"""
props = self.session.query(Userprop).all()
return [{item.key: pickle.loads(item.value)} for item in props]
def getUserprop(self, key):
"""
Get user's property by key
return property's value
return False if something wrong
"""
instance = self.session.query(Userprop).filter_by(key=key).first()
if instance:
return pickle.loads(instance.value)
else:
return None
def getSearch(self):
"""
Get last searching
return list of dicts of last searching if all done
return [] there are not any searching yet
return False if something wrong
"""
search = self.session.query(Search).first()
if search:
return pickle.loads(search.search_obj)
else:
return None
def resume(self):
"restore and re-raise any exception"
if '_saved' not in vars(self):
return
type, exc = map(pickle.loads, self._saved)
six.reraise(type, exc, self._tb)
def do_POST(self):
if COORD.started:
src = self.rfile.read(int(self.headers['content-length']))
job = COORD.next_job(pickle.loads(src))
if job:
self._send_answer(pickle.dumps(job))
return
self.send_response(404)
else:
self.send_response(202)
self.end_headers()
def next_job(self, job):
'''Sends a finished job back to the coordinator and retrieves in exchange the next one.
Kwargs:
job (WorkerJob): job that was finished by a worker and who's results are to be
digested by the coordinator
Returns:
WorkerJob. next job of one of the running epochs that will get
associated with the worker from the finished job and put into state 'running'
'''
if is_chief:
# Try to find the epoch the job belongs to
epoch = next((epoch for epoch in self._epochs_running if epoch.id == job.epoch_id), None)
if epoch:
# We are going to manipulate things - let's avoid undefined state
with self._lock:
# Let the epoch finish the job
epoch.finish_job(job)
# Check, if epoch is done now
if epoch.done():
# If it declares itself done, move it from 'running' to 'done' collection
self._epochs_running.remove(epoch)
self._epochs_done.append(epoch)
# Show the short and/or full WER report
log_info(epoch)
else:
# There was no running epoch found for this job - this should never happen.
log_error('There is no running epoch of id %d for job with ID %d.' % (job.epoch_id, job.id))
return self.get_job(job.worker)
# We are a remote worker and have to hand over to the chief worker by HTTP
result = self._talk_to_chief('', data=pickle.dumps(job))
if result:
result = pickle.loads(result)
return result
def do_POST(self):
if COORD.started:
src = self.rfile.read(int(self.headers['content-length']))
job = COORD.next_job(pickle.loads(src))
if job:
self._send_answer(pickle.dumps(job))
return
self.send_response(404)
else:
self.send_response(202)
self.end_headers()
def do_POST(self):
if COORD.started:
src = self.rfile.read(int(self.headers['content-length']))
job = COORD.next_job(pickle.loads(src))
if job:
self._send_answer(pickle.dumps(job))
return
self.send_response(404)
else:
self.send_response(202)
self.end_headers()