def test_dict_chunking(self):
n = 10 # too small to chunk
x = dict.fromkeys(range(n))
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(x, y)
num_setitems = count_opcode(pickle.SETITEMS, s)
self.assertEqual(num_setitems, proto > 0)
n = 2500 # expect at least two chunks when proto > 0
x = dict.fromkeys(range(n))
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(x, y)
num_setitems = count_opcode(pickle.SETITEMS, s)
if proto == 0:
self.assertEqual(num_setitems, 0)
else:
self.assertTrue(num_setitems >= 2)
python类pickle()的实例源码
def test_reduce_bad_iterator(self):
# Issue4176: crash when 4th and 5th items of __reduce__()
# are not iterators
class C(object):
def __reduce__(self):
# 4th item is not an iterator
return list, (), None, [], None
class D(object):
def __reduce__(self):
# 5th item is not an iterator
return dict, (), None, None, []
# Protocol 0 is less strict and also accept iterables.
for proto in protocols:
try:
self.dumps(C(), proto)
except (AttributeError, pickle.PickleError, cPickle.PickleError):
pass
try:
self.dumps(D(), proto)
except (AttributeError, pickle.PickleError, cPickle.PickleError):
pass
def InitMessage(descriptor, cls):
cls._decoders_by_tag = {}
cls._extensions_by_name = {}
cls._extensions_by_number = {}
if (descriptor.has_options and
descriptor.GetOptions().message_set_wire_format):
cls._decoders_by_tag[decoder.MESSAGE_SET_ITEM_TAG] = (
decoder.MessageSetItemDecoder(cls._extensions_by_number), None)
for field in descriptor.fields:
_AttachFieldHelpers(cls, field)
_AddEnumValues(descriptor, cls)
_AddInitMethod(descriptor, cls)
_AddPropertiesForFields(descriptor, cls)
_AddPropertiesForExtensions(descriptor, cls)
_AddStaticMethods(cls)
_AddMessageMethods(descriptor, cls)
_AddPrivateHelperMethods(descriptor, cls)
copyreg.pickle(cls, lambda obj: (cls, (), obj.__getstate__()))
def test_nonIdentityHash(self):
global ClassWithCustomHash
class ClassWithCustomHash(styles.Versioned):
def __init__(self, unique, hash):
self.unique = unique
self.hash = hash
def __hash__(self):
return self.hash
v1 = ClassWithCustomHash('v1', 0)
v2 = ClassWithCustomHash('v2', 0)
pkl = pickle.dumps((v1, v2))
del v1, v2
ClassWithCustomHash.persistenceVersion = 1
ClassWithCustomHash.upgradeToVersion1 = lambda self: setattr(self, 'upgraded', True)
v1, v2 = pickle.loads(pkl)
styles.doUpgrade()
self.assertEqual(v1.unique, 'v1')
self.assertEqual(v2.unique, 'v2')
self.assertTrue(v1.upgraded)
self.assertTrue(v2.upgraded)
def test_upgradeDeserializesObjectsRequiringUpgrade(self):
global ToyClassA, ToyClassB
class ToyClassA(styles.Versioned):
pass
class ToyClassB(styles.Versioned):
pass
x = ToyClassA()
y = ToyClassB()
pklA, pklB = pickle.dumps(x), pickle.dumps(y)
del x, y
ToyClassA.persistenceVersion = 1
def upgradeToVersion1(self):
self.y = pickle.loads(pklB)
styles.doUpgrade()
ToyClassA.upgradeToVersion1 = upgradeToVersion1
ToyClassB.persistenceVersion = 1
ToyClassB.upgradeToVersion1 = lambda self: setattr(self, 'upgraded', True)
x = pickle.loads(pklA)
styles.doUpgrade()
self.assertTrue(x.y.upgraded)
def pickleMethod(method):
'support function for copy_reg to pickle method refs'
return unpickleMethod, (method.im_func.__name__,
method.im_self,
method.im_class)
def pickleModule(module):
'support function for copy_reg to pickle module refs'
return unpickleModule, (module.__name__,)
def pickleStringO(stringo):
'support function for copy_reg to pickle StringIO.OutputTypes'
return unpickleStringO, (stringo.getvalue(), stringo.tell())
def _ufunc_reconstruct(module, name):
# The `fromlist` kwarg is required to ensure that `mod` points to the
# inner-most module rather than the parent package when module name is
# nested. This makes it possible to pickle non-toplevel ufuncs such as
# scipy.special.expit for instance.
mod = __import__(module, fromlist=[name])
return getattr(mod, name)
def _ufunc_reduce(func):
from pickle import whichmodule
name = func.__name__
return _ufunc_reconstruct, (whichmodule(func, name), name)
def _unchanged(self, response):
session_pickled = pickle.dumps(self, pickle.HIGHEST_PROTOCOL)
response.session_pickled = session_pickled
session_hash = hashlib.md5(session_pickled).hexdigest()
return response.session_hash == session_hash
def _try_store_in_db(self, request, response):
# don't save if file-based sessions,
# no session id, or session being forgotten
# or no changes to session (Unless the session is new)
if (not response.session_db_table
or self._forget
or (self._unchanged(response) and not response.session_new)):
if (not response.session_db_table
and global_settings.db_sessions is not True
and response.session_masterapp in global_settings.db_sessions):
global_settings.db_sessions.remove(response.session_masterapp)
# self.clear_session_cookies()
self.save_session_id_cookie()
return False
table = response.session_db_table
record_id = response.session_db_record_id
if response.session_new:
unique_key = web2py_uuid()
else:
unique_key = response.session_db_unique_key
session_pickled = response.session_pickled or pickle.dumps(self, pickle.HIGHEST_PROTOCOL)
dd = dict(locked=False,
client_ip=response.session_client,
modified_datetime=request.now,
session_data=session_pickled,
unique_key=unique_key)
if record_id:
if not table._db(table.id == record_id).update(**dd):
record_id = None
if not record_id:
record_id = table.insert(**dd)
response.session_id = '%s:%s' % (record_id, unique_key)
response.session_db_unique_key = unique_key
response.session_db_record_id = record_id
self.save_session_id_cookie()
return True
def load_storage(filename):
fp = None
try:
fp = portalocker.LockedFile(filename, 'rb')
storage = pickle.load(fp)
finally:
if fp:
fp.close()
return Storage(storage)
def save_storage(storage, filename):
fp = None
try:
fp = portalocker.LockedFile(filename, 'wb')
pickle.dump(dict(storage), fp)
finally:
if fp:
fp.close()
def TAG_pickler(data):
d = DIV()
d.__dict__ = data.__dict__
marshal_dump = pickle.dumps(d, pickle.HIGHEST_PROTOCOL)
return (TAG_unpickler, (marshal_dump,))
def install():
try:
import copy_reg
except ImportError:
import copyreg as copy_reg
copy_reg.pickle(TracebackType, pickle_traceback)
# Added by gevent
# We have to defer the initialization, and especially the import of platform,
# until runtime. If we're monkey patched, we need to be sure to use
# the original __import__ to avoid switching through the hub due to
# import locks on Python 2. See also builtins.py for details.
def _import_dump_load():
global dumps
global loads
try:
import cPickle as pickle
except ImportError:
import pickle
dumps = pickle.dumps
loads = pickle.loads
def dump_traceback(tb):
# Both _init and dump/load have to be unlocked, because
# copy_reg and pickle can do imports to resolve class names; those
# class names are in this module and greenlet safe though
_init()
return dumps(tb)
def install():
try:
import copy_reg
except ImportError:
import copyreg as copy_reg
copy_reg.pickle(TracebackType, pickle_traceback)
# Added by gevent
# We have to defer the initialization, and especially the import of platform,
# until runtime. If we're monkey patched, we need to be sure to use
# the original __import__ to avoid switching through the hub due to
# import locks on Python 2. See also builtins.py for details.
def _import_dump_load():
global dumps
global loads
try:
import cPickle as pickle
except ImportError:
import pickle
dumps = pickle.dumps
loads = pickle.loads