def test_long(self):
for proto in protocols:
# 256 bytes is where LONG4 begins.
for nbits in 1, 8, 8*254, 8*255, 8*256, 8*257:
nbase = 1 << nbits
for npos in nbase-1, nbase, nbase+1:
for n in npos, -npos:
pickle = self.dumps(n, proto)
got = self.loads(pickle)
self.assertEqual(n, got)
# Try a monster. This is quadratic-time in protos 0 & 1, so don't
# bother with those.
nbase = int("deadbeeffeedface", 16)
nbase += nbase << 1000000
for n in nbase, -nbase:
p = self.dumps(n, 2)
got = self.loads(p)
self.assertEqual(n, got)
python类pickle()的实例源码
def test_proto(self):
build_none = pickle.NONE + pickle.STOP
for proto in protocols:
expected = build_none
if proto >= 2:
expected = pickle.PROTO + bytes([proto]) + expected
p = self.dumps(None, proto)
self.assertEqual(p, expected)
oob = protocols[-1] + 1 # a future protocol
badpickle = pickle.PROTO + bytes([oob]) + build_none
try:
self.loads(badpickle)
except ValueError as detail:
self.assertTrue(str(detail).startswith(
"unsupported pickle protocol"))
else:
self.fail("expected bad protocol number to raise ValueError")
def test_singletons(self):
# Map (proto, singleton) to expected opcode.
expected_opcode = {(0, None): pickle.NONE,
(1, None): pickle.NONE,
(2, None): pickle.NONE,
(3, None): pickle.NONE,
(0, True): pickle.INT,
(1, True): pickle.INT,
(2, True): pickle.NEWTRUE,
(3, True): pickle.NEWTRUE,
(0, False): pickle.INT,
(1, False): pickle.INT,
(2, False): pickle.NEWFALSE,
(3, False): pickle.NEWFALSE,
}
for proto in protocols:
for x in None, False, True:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertTrue(x is y, (proto, x, s, y))
expected = expected_opcode[proto, x]
self.assertEqual(opcode_in_pickle(expected, s), True)
def test_list_chunking(self):
n = 10 # too small to chunk
x = list(range(n))
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(x, y)
num_appends = count_opcode(pickle.APPENDS, s)
self.assertEqual(num_appends, proto > 0)
n = 2500 # expect at least two chunks when proto > 0
x = list(range(n))
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(x, y)
num_appends = count_opcode(pickle.APPENDS, s)
if proto == 0:
self.assertEqual(num_appends, 0)
else:
self.assertTrue(num_appends >= 2)
def test_dict_chunking(self):
n = 10 # too small to chunk
x = dict.fromkeys(range(n))
for proto in protocols:
s = self.dumps(x, proto)
self.assertIsInstance(s, bytes_types)
y = self.loads(s)
self.assertEqual(x, y)
num_setitems = count_opcode(pickle.SETITEMS, s)
self.assertEqual(num_setitems, proto > 0)
n = 2500 # expect at least two chunks when proto > 0
x = dict.fromkeys(range(n))
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(x, y)
num_setitems = count_opcode(pickle.SETITEMS, s)
if proto == 0:
self.assertEqual(num_setitems, 0)
else:
self.assertTrue(num_setitems >= 2)
def test_reduce_bad_iterator(self):
# Issue4176: crash when 4th and 5th items of __reduce__()
# are not iterators
class C(object):
def __reduce__(self):
# 4th item is not an iterator
return list, (), None, [], None
class D(object):
def __reduce__(self):
# 5th item is not an iterator
return dict, (), None, None, []
# Protocol 0 is less strict and also accept iterables.
for proto in protocols:
try:
self.dumps(C(), proto)
except (pickle.PickleError):
pass
try:
self.dumps(D(), proto)
except (pickle.PickleError):
pass
def putmessage(self, message):
self.debug("putmessage:%d:" % message[0])
try:
s = pickle.dumps(message)
except pickle.PicklingError:
print("Cannot pickle:", repr(message), file=sys.__stderr__)
raise
s = struct.pack("<i", len(s)) + s
while len(s) > 0:
try:
r, w, x = select.select([], [self.sock], [])
n = self.sock.send(s[:BUFSIZE])
except (AttributeError, TypeError):
raise IOError("socket no longer exists")
except socket.error:
raise
else:
s = s[n:]
def test_long(self):
for proto in protocols:
# 256 bytes is where LONG4 begins.
for nbits in 1, 8, 8*254, 8*255, 8*256, 8*257:
nbase = 1 << nbits
for npos in nbase-1, nbase, nbase+1:
for n in npos, -npos:
pickle = self.dumps(n, proto)
got = self.loads(pickle)
self.assertEqual(n, got)
# Try a monster. This is quadratic-time in protos 0 & 1, so don't
# bother with those.
nbase = int("deadbeeffeedface", 16)
nbase += nbase << 1000000
for n in nbase, -nbase:
p = self.dumps(n, 2)
got = self.loads(p)
self.assertEqual(n, got)
def test_proto(self):
build_none = pickle.NONE + pickle.STOP
for proto in protocols:
expected = build_none
if proto >= 2:
expected = pickle.PROTO + bytes([proto]) + expected
p = self.dumps(None, proto)
self.assertEqual(p, expected)
oob = protocols[-1] + 1 # a future protocol
badpickle = pickle.PROTO + bytes([oob]) + build_none
try:
self.loads(badpickle)
except ValueError as detail:
self.assertTrue(str(detail).startswith(
"unsupported pickle protocol"))
else:
self.fail("expected bad protocol number to raise ValueError")
def test_singletons(self):
# Map (proto, singleton) to expected opcode.
expected_opcode = {(0, None): pickle.NONE,
(1, None): pickle.NONE,
(2, None): pickle.NONE,
(3, None): pickle.NONE,
(0, True): pickle.INT,
(1, True): pickle.INT,
(2, True): pickle.NEWTRUE,
(3, True): pickle.NEWTRUE,
(0, False): pickle.INT,
(1, False): pickle.INT,
(2, False): pickle.NEWFALSE,
(3, False): pickle.NEWFALSE,
}
for proto in protocols:
for x in None, False, True:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertTrue(x is y, (proto, x, s, y))
expected = expected_opcode[proto, x]
self.assertEqual(opcode_in_pickle(expected, s), True)
def test_list_chunking(self):
n = 10 # too small to chunk
x = list(range(n))
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(x, y)
num_appends = count_opcode(pickle.APPENDS, s)
self.assertEqual(num_appends, proto > 0)
n = 2500 # expect at least two chunks when proto > 0
x = list(range(n))
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(x, y)
num_appends = count_opcode(pickle.APPENDS, s)
if proto == 0:
self.assertEqual(num_appends, 0)
else:
self.assertTrue(num_appends >= 2)
def test_dict_chunking(self):
n = 10 # too small to chunk
x = dict.fromkeys(range(n))
for proto in protocols:
s = self.dumps(x, proto)
self.assertIsInstance(s, bytes_types)
y = self.loads(s)
self.assertEqual(x, y)
num_setitems = count_opcode(pickle.SETITEMS, s)
self.assertEqual(num_setitems, proto > 0)
n = 2500 # expect at least two chunks when proto > 0
x = dict.fromkeys(range(n))
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(x, y)
num_setitems = count_opcode(pickle.SETITEMS, s)
if proto == 0:
self.assertEqual(num_setitems, 0)
else:
self.assertTrue(num_setitems >= 2)
def test_reduce_bad_iterator(self):
# Issue4176: crash when 4th and 5th items of __reduce__()
# are not iterators
class C(object):
def __reduce__(self):
# 4th item is not an iterator
return list, (), None, [], None
class D(object):
def __reduce__(self):
# 5th item is not an iterator
return dict, (), None, None, []
# Protocol 0 is less strict and also accept iterables.
for proto in protocols:
try:
self.dumps(C(), proto)
except (pickle.PickleError):
pass
try:
self.dumps(D(), proto)
except (pickle.PickleError):
pass
def test_badly_quoted_string(self):
# Issue #17710
badpickles = [b"S'\n.",
b'S"\n.',
b'S\' \n.',
b'S" \n.',
b'S\'"\n.',
b'S"\'\n.',
b"S' ' \n.",
b'S" " \n.',
b"S ''\n.",
b'S ""\n.',
b'S \n.',
b'S\n.',
b'S.']
for p in badpickles:
self.check_unpickling_error(pickle.UnpicklingError, p)
def test_long(self):
for proto in protocols:
# 256 bytes is where LONG4 begins.
for nbits in 1, 8, 8*254, 8*255, 8*256, 8*257:
nbase = 1 << nbits
for npos in nbase-1, nbase, nbase+1:
for n in npos, -npos:
pickle = self.dumps(n, proto)
got = self.loads(pickle)
self.assert_is_copy(n, got)
# Try a monster. This is quadratic-time in protos 0 & 1, so don't
# bother with those.
nbase = int("deadbeeffeedface", 16)
nbase += nbase << 1000000
for n in nbase, -nbase:
p = self.dumps(n, 2)
got = self.loads(p)
# assert_is_copy is very expensive here as it precomputes
# a failure message by computing the repr() of n and got,
# we just do the check ourselves.
self.assertIs(type(got), int)
self.assertEqual(n, got)
def test_proto(self):
for proto in protocols:
pickled = self.dumps(None, proto)
if proto >= 2:
proto_header = pickle.PROTO + bytes([proto])
self.assertTrue(pickled.startswith(proto_header))
else:
self.assertEqual(count_opcode(pickle.PROTO, pickled), 0)
oob = protocols[-1] + 1 # a future protocol
build_none = pickle.NONE + pickle.STOP
badpickle = pickle.PROTO + bytes([oob]) + build_none
try:
self.loads(badpickle)
except ValueError as err:
self.assertIn("unsupported pickle protocol", str(err))
else:
self.fail("expected bad protocol number to raise ValueError")
def test_list_chunking(self):
n = 10 # too small to chunk
x = list(range(n))
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assert_is_copy(x, y)
num_appends = count_opcode(pickle.APPENDS, s)
self.assertEqual(num_appends, proto > 0)
n = 2500 # expect at least two chunks when proto > 0
x = list(range(n))
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assert_is_copy(x, y)
num_appends = count_opcode(pickle.APPENDS, s)
if proto == 0:
self.assertEqual(num_appends, 0)
else:
self.assertTrue(num_appends >= 2)
def test_set_chunking(self):
n = 10 # too small to chunk
x = set(range(n))
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assert_is_copy(x, y)
num_additems = count_opcode(pickle.ADDITEMS, s)
if proto < 4:
self.assertEqual(num_additems, 0)
else:
self.assertEqual(num_additems, 1)
n = 2500 # expect at least two chunks when proto >= 4
x = set(range(n))
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assert_is_copy(x, y)
num_additems = count_opcode(pickle.ADDITEMS, s)
if proto < 4:
self.assertEqual(num_additems, 0)
else:
self.assertGreaterEqual(num_additems, 2)
def test_reduce_bad_iterator(self):
# Issue4176: crash when 4th and 5th items of __reduce__()
# are not iterators
class C(object):
def __reduce__(self):
# 4th item is not an iterator
return list, (), None, [], None
class D(object):
def __reduce__(self):
# 5th item is not an iterator
return dict, (), None, None, []
# Protocol 0 is less strict and also accept iterables.
for proto in protocols:
try:
self.dumps(C(), proto)
except (pickle.PickleError):
pass
try:
self.dumps(D(), proto)
except (pickle.PickleError):
pass
def check_frame_opcodes(self, pickled):
"""
Check the arguments of FRAME opcodes in a protocol 4+ pickle.
"""
frame_opcode_size = 9
last_arg = last_pos = None
for op, arg, pos in pickletools.genops(pickled):
if op.name != 'FRAME':
continue
if last_pos is not None:
# The previous frame's size should be equal to the number
# of bytes up to the current frame.
frame_size = pos - last_pos - frame_opcode_size
self.assertEqual(frame_size, last_arg)
last_arg, last_pos = arg, pos
# The last frame's size should be equal to the number of bytes up
# to the pickle's end.
frame_size = len(pickled) - last_pos - frame_opcode_size
self.assertEqual(frame_size, last_arg)
def test_huge_bytes_32b(self, size):
data = b"abcd" * (size // 4)
try:
for proto in protocols:
if proto < 3:
continue
with self.subTest(proto=proto):
try:
pickled = self.dumps(data, protocol=proto)
header = (pickle.BINBYTES +
struct.pack("<I", len(data)))
data_start = pickled.index(data)
self.assertEqual(
header,
pickled[data_start-len(header):data_start])
finally:
pickled = None
finally:
data = None
def test_huge_str_32b(self, size):
data = "abcd" * (size // 4)
try:
for proto in protocols:
if proto == 0:
continue
with self.subTest(proto=proto):
try:
pickled = self.dumps(data, protocol=proto)
header = (pickle.BINUNICODE +
struct.pack("<I", len(data)))
data_start = pickled.index(b'abcd')
self.assertEqual(
header,
pickled[data_start-len(header):data_start])
self.assertEqual((pickled.rindex(b"abcd") + len(b"abcd") -
pickled.index(b"abcd")), len(data))
finally:
pickled = None
finally:
data = None
# BINUNICODE (protocols 1, 2 and 3) cannot carry more than 2**32 - 1 bytes
# of utf-8 encoded unicode. BINUNICODE8 (protocol 4) supports these huge
# unicode strings however.
def InitMessage(descriptor, cls):
cls._decoders_by_tag = {}
cls._extensions_by_name = {}
cls._extensions_by_number = {}
if (descriptor.has_options and
descriptor.GetOptions().message_set_wire_format):
cls._decoders_by_tag[decoder.MESSAGE_SET_ITEM_TAG] = (
decoder.MessageSetItemDecoder(cls._extensions_by_number), None)
for field in descriptor.fields:
_AttachFieldHelpers(cls, field)
_AddEnumValues(descriptor, cls)
_AddInitMethod(descriptor, cls)
_AddPropertiesForFields(descriptor, cls)
_AddPropertiesForExtensions(descriptor, cls)
_AddStaticMethods(cls)
_AddMessageMethods(descriptor, cls)
_AddPrivateHelperMethods(descriptor, cls)
copyreg.pickle(cls, lambda obj: (cls, (), obj.__getstate__()))
def test_nonIdentityHash(self):
global ClassWithCustomHash
class ClassWithCustomHash(styles.Versioned):
def __init__(self, unique, hash):
self.unique = unique
self.hash = hash
def __hash__(self):
return self.hash
v1 = ClassWithCustomHash('v1', 0)
v2 = ClassWithCustomHash('v2', 0)
pkl = pickle.dumps((v1, v2))
del v1, v2
ClassWithCustomHash.persistenceVersion = 1
ClassWithCustomHash.upgradeToVersion1 = lambda self: setattr(self, 'upgraded', True)
v1, v2 = pickle.loads(pkl)
styles.doUpgrade()
self.assertEqual(v1.unique, 'v1')
self.assertEqual(v2.unique, 'v2')
self.assertTrue(v1.upgraded)
self.assertTrue(v2.upgraded)
def test_upgradeDeserializesObjectsRequiringUpgrade(self):
global ToyClassA, ToyClassB
class ToyClassA(styles.Versioned):
pass
class ToyClassB(styles.Versioned):
pass
x = ToyClassA()
y = ToyClassB()
pklA, pklB = pickle.dumps(x), pickle.dumps(y)
del x, y
ToyClassA.persistenceVersion = 1
def upgradeToVersion1(self):
self.y = pickle.loads(pklB)
styles.doUpgrade()
ToyClassA.upgradeToVersion1 = upgradeToVersion1
ToyClassB.persistenceVersion = 1
ToyClassB.upgradeToVersion1 = lambda self: setattr(self, 'upgraded', True)
x = pickle.loads(pklA)
styles.doUpgrade()
self.assertTrue(x.y.upgraded)
def _pickleFunction(f):
"""
Reduce, in the sense of L{pickle}'s C{object.__reduce__} special method, a
function object into its constituent parts.
@param f: The function to reduce.
@type f: L{types.FunctionType}
@return: a 2-tuple of a reference to L{_unpickleFunction} and a tuple of
its arguments, a 1-tuple of the function's fully qualified name.
@rtype: 2-tuple of C{callable, native string}
"""
if f.__name__ == '<lambda>':
raise _UniversalPicklingError(
"Cannot pickle lambda function: {}".format(f))
return (_unpickleFunction,
tuple([".".join([f.__module__, f.__qualname__])]))
def test_long(self):
for proto in protocols:
# 256 bytes is where LONG4 begins.
for nbits in 1, 8, 8*254, 8*255, 8*256, 8*257:
nbase = 1 << nbits
for npos in nbase-1, nbase, nbase+1:
for n in npos, -npos:
pickle = self.dumps(n, proto)
got = self.loads(pickle)
self.assert_is_copy(n, got)
# Try a monster. This is quadratic-time in protos 0 & 1, so don't
# bother with those.
nbase = int("deadbeeffeedface", 16)
nbase += nbase << 1000000
for n in nbase, -nbase:
p = self.dumps(n, 2)
got = self.loads(p)
# assert_is_copy is very expensive here as it precomputes
# a failure message by computing the repr() of n and got,
# we just do the check ourselves.
self.assertIs(type(got), int)
self.assertEqual(n, got)
def test_proto(self):
for proto in protocols:
pickled = self.dumps(None, proto)
if proto >= 2:
proto_header = pickle.PROTO + bytes([proto])
self.assertTrue(pickled.startswith(proto_header))
else:
self.assertEqual(count_opcode(pickle.PROTO, pickled), 0)
oob = protocols[-1] + 1 # a future protocol
build_none = pickle.NONE + pickle.STOP
badpickle = pickle.PROTO + bytes([oob]) + build_none
try:
self.loads(badpickle)
except ValueError as err:
self.assertIn("unsupported pickle protocol", str(err))
else:
self.fail("expected bad protocol number to raise ValueError")
def test_singletons(self):
# Map (proto, singleton) to expected opcode.
expected_opcode = {(0, None): pickle.NONE,
(1, None): pickle.NONE,
(2, None): pickle.NONE,
(3, None): pickle.NONE,
(0, True): pickle.INT,
(1, True): pickle.INT,
(2, True): pickle.NEWTRUE,
(3, True): pickle.NEWTRUE,
(0, False): pickle.INT,
(1, False): pickle.INT,
(2, False): pickle.NEWFALSE,
(3, False): pickle.NEWFALSE,
}
for proto in protocols:
for x in None, False, True:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertTrue(x is y, (proto, x, s, y))
expected = expected_opcode[min(proto, 3), x]
self.assertTrue(opcode_in_pickle(expected, s))
def test_list_chunking(self):
n = 10 # too small to chunk
x = list(range(n))
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assert_is_copy(x, y)
num_appends = count_opcode(pickle.APPENDS, s)
self.assertEqual(num_appends, proto > 0)
n = 2500 # expect at least two chunks when proto > 0
x = list(range(n))
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assert_is_copy(x, y)
num_appends = count_opcode(pickle.APPENDS, s)
if proto == 0:
self.assertEqual(num_appends, 0)
else:
self.assertTrue(num_appends >= 2)