def test_newobj_proxies(self):
# NEWOBJ should use the __class__ rather than the raw type
import weakref
classes = myclasses[:]
# Cannot create weakproxies to these classes
for c in (MyInt, MyLong, MyStr, MyTuple):
classes.remove(c)
for proto in protocols:
for C in classes:
B = C.__base__
x = C(C.sample)
x.foo = 42
p = weakref.proxy(x)
s = self.dumps(p, proto)
y = self.loads(s)
self.assertEqual(type(y), type(x)) # rather than type(p)
detail = (proto, C, B, x, y, type(y))
self.assertEqual(B(x), B(y), detail)
self.assertEqual(x.__dict__, y.__dict__, detail)
# Register a type with copy_reg, with extension code extcode. Pickle
# an object of that type. Check that the resulting pickle uses opcode
# (EXT[124]) under proto 2, and not in proto 1.
python类NEWOBJ的实例源码
def test_newobj_proxies(self):
# NEWOBJ should use the __class__ rather than the raw type
import weakref
classes = myclasses[:]
# Cannot create weakproxies to these classes
for c in (MyInt, MyLong, MyStr, MyTuple):
classes.remove(c)
for proto in protocols:
for C in classes:
B = C.__base__
x = C(C.sample)
x.foo = 42
p = weakref.proxy(x)
s = self.dumps(p, proto)
y = self.loads(s)
self.assertEqual(type(y), type(x)) # rather than type(p)
detail = (proto, C, B, x, y, type(y))
self.assertEqual(B(x), B(y), detail)
self.assertEqual(x.__dict__, y.__dict__, detail)
# Register a type with copy_reg, with extension code extcode. Pickle
# an object of that type. Check that the resulting pickle uses opcode
# (EXT[124]) under proto 2, and not in proto 1.
def test_newobj_proxies(self):
# NEWOBJ should use the __class__ rather than the raw type
classes = myclasses[:]
# Cannot create weakproxies to these classes
for c in (MyInt, MyTuple):
classes.remove(c)
for proto in protocols:
for C in classes:
B = C.__base__
x = C(C.sample)
x.foo = 42
p = weakref.proxy(x)
s = self.dumps(p, proto)
y = self.loads(s)
self.assertEqual(type(y), type(x)) # rather than type(p)
detail = (proto, C, B, x, y, type(y))
self.assertEqual(B(x), B(y), detail)
self.assertEqual(x.__dict__, y.__dict__, detail)
# Register a type with copyreg, with extension code extcode. Pickle
# an object of that type. Check that the resulting pickle uses opcode
# (EXT[124]) under proto 2, and not in proto 1.
def test_newobj_proxies(self):
# NEWOBJ should use the __class__ rather than the raw type
classes = myclasses[:]
# Cannot create weakproxies to these classes
for c in (MyInt, MyTuple):
classes.remove(c)
for proto in protocols:
for C in classes:
B = C.__base__
x = C(C.sample)
x.foo = 42
p = weakref.proxy(x)
s = self.dumps(p, proto)
y = self.loads(s)
self.assertEqual(type(y), type(x)) # rather than type(p)
detail = (proto, C, B, x, y, type(y))
self.assertEqual(B(x), B(y), detail)
self.assertEqual(x.__dict__, y.__dict__, detail)
def test_newobj_proxies(self):
# NEWOBJ should use the __class__ rather than the raw type
classes = myclasses[:]
# Cannot create weakproxies to these classes
for c in (MyInt, MyTuple):
classes.remove(c)
for proto in protocols:
for C in classes:
B = C.__base__
x = C(C.sample)
x.foo = 42
p = weakref.proxy(x)
s = self.dumps(p, proto)
y = self.loads(s)
self.assertEqual(type(y), type(x)) # rather than type(p)
detail = (proto, C, B, x, y, type(y))
self.assertEqual(B(x), B(y), detail)
self.assertEqual(x.__dict__, y.__dict__, detail)
# Register a type with copyreg, with extension code extcode. Pickle
# an object of that type. Check that the resulting pickle uses opcode
# (EXT[124]) under proto 2, and not in proto 1.
def test_simple_newobj(self):
x = object.__new__(SimpleNewObj) # avoid __init__
x.abc = 666
for proto in protocols:
s = self.dumps(x, proto)
self.assertEqual(opcode_in_pickle(pickle.NEWOBJ, s), proto >= 2)
y = self.loads(s) # will raise TypeError if __init__ called
self.assertEqual(y.abc, 666)
self.assertEqual(x.__dict__, y.__dict__)
def test_bad_stack(self):
badpickles = [
'.', # STOP
'0', # POP
'1', # POP_MARK
'2', # DUP
# '(2', # PyUnpickler doesn't raise
'R', # REDUCE
')R',
'a', # APPEND
'Na',
'b', # BUILD
'Nb',
'd', # DICT
'e', # APPENDS
# '(e', # PyUnpickler raises AttributeError
'i__builtin__\nlist\n', # INST
'l', # LIST
'o', # OBJ
'(o',
'p1\n', # PUT
'q\x00', # BINPUT
'r\x00\x00\x00\x00', # LONG_BINPUT
's', # SETITEM
'Ns',
'NNs',
't', # TUPLE
'u', # SETITEMS
# '(u', # PyUnpickler doesn't raise
'}(Nu',
'\x81', # NEWOBJ
')\x81',
'\x85', # TUPLE1
'\x86', # TUPLE2
'N\x86',
'\x87', # TUPLE3
'N\x87',
'NN\x87',
]
for p in badpickles:
self.check_unpickling_error(self.bad_stack_errors, p)
def test_bad_mark(self):
badpickles = [
# 'N(.', # STOP
'N(2', # DUP
'c__builtin__\nlist\n)(R', # REDUCE
'c__builtin__\nlist\n()R',
']N(a', # APPEND
# BUILD
'c__builtin__\nValueError\n)R}(b',
'c__builtin__\nValueError\n)R(}b',
'(Nd', # DICT
'N(p1\n', # PUT
'N(q\x00', # BINPUT
'N(r\x00\x00\x00\x00', # LONG_BINPUT
'}NN(s', # SETITEM
'}N(Ns',
'}(NNs',
'}((u', # SETITEMS
# NEWOBJ
'c__builtin__\nlist\n)(\x81',
'c__builtin__\nlist\n()\x81',
'N(\x85', # TUPLE1
'NN(\x86', # TUPLE2
'N(N\x86',
'NNN(\x87', # TUPLE3
'NN(N\x87',
'N(NN\x87',
]
for p in badpickles:
self.check_unpickling_error(self.bad_mark_errors, p)
def test_simple_newobj(self):
x = SimpleNewObj.__new__(SimpleNewObj, 0xface) # avoid __init__
x.abc = 666
for proto in protocols:
s = self.dumps(x, proto)
if proto < 1:
self.assertIn('\nI64206', s) # INT
else:
self.assertIn('M\xce\xfa', s) # BININT2
self.assertEqual(opcode_in_pickle(pickle.NEWOBJ, s), proto >= 2)
y = self.loads(s) # will raise TypeError if __init__ called
self.assertEqual(y.abc, 666)
self.assertEqual(x.__dict__, y.__dict__)
def test_complex_newobj(self):
x = ComplexNewObj.__new__(ComplexNewObj, 0xface) # avoid __init__
x.abc = 666
for proto in protocols:
s = self.dumps(x, proto)
if proto < 1:
self.assertIn('\nI64206', s) # INT
elif proto < 2:
self.assertIn('M\xce\xfa', s) # BININT2
else:
self.assertIn('U\x04FACE', s) # SHORT_BINSTRING
self.assertEqual(opcode_in_pickle(pickle.NEWOBJ, s), proto >= 2)
y = self.loads(s) # will raise TypeError if __init__ called
self.assertEqual(y.abc, 666)
self.assertEqual(x.__dict__, y.__dict__)
def test_bad_stack(self):
badpickles = [
'.', # STOP
'0', # POP
'1', # POP_MARK
'2', # DUP
# '(2', # PyUnpickler doesn't raise
'R', # REDUCE
')R',
'a', # APPEND
'Na',
'b', # BUILD
'Nb',
'd', # DICT
'e', # APPENDS
# '(e', # PyUnpickler raises AttributeError
'i__builtin__\nlist\n', # INST
'l', # LIST
'o', # OBJ
'(o',
'p1\n', # PUT
'q\x00', # BINPUT
'r\x00\x00\x00\x00', # LONG_BINPUT
's', # SETITEM
'Ns',
'NNs',
't', # TUPLE
'u', # SETITEMS
# '(u', # PyUnpickler doesn't raise
'}(Nu',
'\x81', # NEWOBJ
')\x81',
'\x85', # TUPLE1
'\x86', # TUPLE2
'N\x86',
'\x87', # TUPLE3
'N\x87',
'NN\x87',
]
for p in badpickles:
self.check_unpickling_error(self.bad_stack_errors, p)
def test_bad_mark(self):
badpickles = [
# 'N(.', # STOP
'N(2', # DUP
'c__builtin__\nlist\n)(R', # REDUCE
'c__builtin__\nlist\n()R',
']N(a', # APPEND
# BUILD
'c__builtin__\nValueError\n)R}(b',
'c__builtin__\nValueError\n)R(}b',
'(Nd', # DICT
'N(p1\n', # PUT
'N(q\x00', # BINPUT
'N(r\x00\x00\x00\x00', # LONG_BINPUT
'}NN(s', # SETITEM
'}N(Ns',
'}(NNs',
'}((u', # SETITEMS
# NEWOBJ
'c__builtin__\nlist\n)(\x81',
'c__builtin__\nlist\n()\x81',
'N(\x85', # TUPLE1
'NN(\x86', # TUPLE2
'N(N\x86',
'NNN(\x87', # TUPLE3
'NN(N\x87',
'N(NN\x87',
]
for p in badpickles:
self.check_unpickling_error(self.bad_mark_errors, p)
def test_simple_newobj(self):
x = SimpleNewObj.__new__(SimpleNewObj, 0xface) # avoid __init__
x.abc = 666
for proto in protocols:
s = self.dumps(x, proto)
if proto < 1:
self.assertIn('\nI64206', s) # INT
else:
self.assertIn('M\xce\xfa', s) # BININT2
self.assertEqual(opcode_in_pickle(pickle.NEWOBJ, s), proto >= 2)
y = self.loads(s) # will raise TypeError if __init__ called
self.assertEqual(y.abc, 666)
self.assertEqual(x.__dict__, y.__dict__)
def test_complex_newobj(self):
x = ComplexNewObj.__new__(ComplexNewObj, 0xface) # avoid __init__
x.abc = 666
for proto in protocols:
s = self.dumps(x, proto)
if proto < 1:
self.assertIn('\nI64206', s) # INT
elif proto < 2:
self.assertIn('M\xce\xfa', s) # BININT2
else:
self.assertIn('U\x04FACE', s) # SHORT_BINSTRING
self.assertEqual(opcode_in_pickle(pickle.NEWOBJ, s), proto >= 2)
y = self.loads(s) # will raise TypeError if __init__ called
self.assertEqual(y.abc, 666)
self.assertEqual(x.__dict__, y.__dict__)
def test_simple_newobj(self):
x = object.__new__(SimpleNewObj) # avoid __init__
x.abc = 666
for proto in protocols:
s = self.dumps(x, proto)
self.assertEqual(opcode_in_pickle(pickle.NEWOBJ, s), proto >= 2)
y = self.loads(s) # will raise TypeError if __init__ called
self.assertEqual(y.abc, 666)
self.assertEqual(x.__dict__, y.__dict__)
def test_simple_newobj(self):
x = object.__new__(SimpleNewObj) # avoid __init__
x.abc = 666
for proto in protocols:
s = self.dumps(x, proto)
self.assertEqual(opcode_in_pickle(pickle.NEWOBJ, s), proto >= 2)
y = self.loads(s) # will raise TypeError if __init__ called
self.assertEqual(y.abc, 666)
self.assertEqual(x.__dict__, y.__dict__)
def test_bad_mark(self):
badpickles = [
# b'N(.', # STOP
b'N(2', # DUP
b'cbuiltins\nlist\n)(R', # REDUCE
b'cbuiltins\nlist\n()R',
b']N(a', # APPEND
# BUILD
b'cbuiltins\nValueError\n)R}(b',
b'cbuiltins\nValueError\n)R(}b',
b'(Nd', # DICT
b'N(p1\n', # PUT
b'N(q\x00', # BINPUT
b'N(r\x00\x00\x00\x00', # LONG_BINPUT
b'}NN(s', # SETITEM
b'}N(Ns',
b'}(NNs',
b'}((u', # SETITEMS
b'cbuiltins\nlist\n)(\x81', # NEWOBJ
b'cbuiltins\nlist\n()\x81',
b'N(\x85', # TUPLE1
b'NN(\x86', # TUPLE2
b'N(N\x86',
b'NNN(\x87', # TUPLE3
b'NN(N\x87',
b'N(NN\x87',
b']((\x90', # ADDITEMS
# NEWOBJ_EX
b'cbuiltins\nlist\n)}(\x92',
b'cbuiltins\nlist\n)(}\x92',
b'cbuiltins\nlist\n()}\x92',
# STACK_GLOBAL
b'Vbuiltins\n(Vlist\n\x93',
b'Vbuiltins\nVlist\n(\x93',
b'N(\x94', # MEMOIZE
]
for p in badpickles:
self.check_unpickling_error(self.bad_mark_errors, p)
def test_simple_newobj(self):
x = object.__new__(SimpleNewObj) # avoid __init__
x.abc = 666
for proto in protocols:
s = self.dumps(x, proto)
self.assertEqual(opcode_in_pickle(pickle.NEWOBJ, s),
2 <= proto < 4)
self.assertEqual(opcode_in_pickle(pickle.NEWOBJ_EX, s),
proto >= 4)
y = self.loads(s) # will raise TypeError if __init__ called
self.assert_is_copy(x, y)
def test_simple_newobj(self):
x = object.__new__(SimpleNewObj) # avoid __init__
x.abc = 666
for proto in protocols:
s = self.dumps(x, proto)
self.assertEqual(opcode_in_pickle(pickle.NEWOBJ, s), proto >= 2)
y = self.loads(s) # will raise TypeError if __init__ called
self.assertEqual(y.abc, 666)
self.assertEqual(x.__dict__, y.__dict__)
def test_simple_newobj(self):
x = object.__new__(SimpleNewObj) # avoid __init__
x.abc = 666
for proto in protocols:
s = self.dumps(x, proto)
self.assertEqual(opcode_in_pickle(pickle.NEWOBJ, s),
2 <= proto < 4)
self.assertEqual(opcode_in_pickle(pickle.NEWOBJ_EX, s),
proto >= 4)
y = self.loads(s) # will raise TypeError if __init__ called
self.assert_is_copy(x, y)
def save_reduce(self, func, args, state=None,
listitems=None, dictitems=None, obj=None):
"""Modified to support __transient__ on new objects
Change only affects protocol level 2 (which is always used by PiCloud"""
# Assert that args is a tuple or None
if not isinstance(args, tuple):
raise pickle.PicklingError("args from reduce() should be a tuple")
# Assert that func is callable
if not hasattr(func, '__call__'):
raise pickle.PicklingError("func from reduce should be callable")
save = self.save
write = self.write
# Protocol 2 special case: if func's name is __newobj__, use NEWOBJ
if self.proto >= 2 and getattr(func, "__name__", "") == "__newobj__":
#Added fix to allow transient
cls = args[0]
if not hasattr(cls, "__new__"):
raise pickle.PicklingError(
"args[0] from __newobj__ args has no __new__")
if obj is not None and cls is not obj.__class__:
raise pickle.PicklingError(
"args[0] from __newobj__ args has the wrong class")
args = args[1:]
save(cls)
#Don't pickle transient entries
if hasattr(obj, '__transient__'):
transient = obj.__transient__
state = state.copy()
for k in list(state.keys()):
if k in transient:
del state[k]
save(args)
write(pickle.NEWOBJ)
else:
save(func)
save(args)
write(pickle.REDUCE)
if obj is not None:
self.memoize(obj)
# More new special cases (that work with older protocols as
# well): when __reduce__ returns a tuple with 4 or 5 items,
# the 4th and 5th item should be iterators that provide list
# items and dict items (as (key, value) tuples), or None.
if listitems is not None:
self._batch_appends(listitems)
if dictitems is not None:
self._batch_setitems(dictitems)
if state is not None:
save(state)
write(pickle.BUILD)
def save_reduce(self, func, args, state=None,
listitems=None, dictitems=None, obj=None):
"""Modified to support __transient__ on new objects
Change only affects protocol level 2 (which is always used by PiCloud"""
# Assert that args is a tuple or None
if not isinstance(args, tuple):
raise pickle.PicklingError("args from reduce() should be a tuple")
# Assert that func is callable
if not hasattr(func, '__call__'):
raise pickle.PicklingError("func from reduce should be callable")
save = self.save
write = self.write
# Protocol 2 special case: if func's name is __newobj__, use NEWOBJ
if self.proto >= 2 and getattr(func, "__name__", "") == "__newobj__":
#Added fix to allow transient
cls = args[0]
if not hasattr(cls, "__new__"):
raise pickle.PicklingError(
"args[0] from __newobj__ args has no __new__")
if obj is not None and cls is not obj.__class__:
raise pickle.PicklingError(
"args[0] from __newobj__ args has the wrong class")
args = args[1:]
save(cls)
#Don't pickle transient entries
if hasattr(obj, '__transient__'):
transient = obj.__transient__
state = state.copy()
for k in list(state.keys()):
if k in transient:
del state[k]
save(args)
write(pickle.NEWOBJ)
else:
save(func)
save(args)
write(pickle.REDUCE)
if obj is not None:
self.memoize(obj)
# More new special cases (that work with older protocols as
# well): when __reduce__ returns a tuple with 4 or 5 items,
# the 4th and 5th item should be iterators that provide list
# items and dict items (as (key, value) tuples), or None.
if listitems is not None:
self._batch_appends(listitems)
if dictitems is not None:
self._batch_setitems(dictitems)
if state is not None:
save(state)
write(pickle.BUILD)
def test_bad_stack(self):
badpickles = [
b'.', # STOP
b'0', # POP
b'1', # POP_MARK
b'2', # DUP
# b'(2', # PyUnpickler doesn't raise
b'R', # REDUCE
b')R',
b'a', # APPEND
b'Na',
b'b', # BUILD
b'Nb',
b'd', # DICT
b'e', # APPENDS
# b'(e', # PyUnpickler raises AttributeError
b'ibuiltins\nlist\n', # INST
b'l', # LIST
b'o', # OBJ
b'(o',
b'p1\n', # PUT
b'q\x00', # BINPUT
b'r\x00\x00\x00\x00', # LONG_BINPUT
b's', # SETITEM
b'Ns',
b'NNs',
b't', # TUPLE
b'u', # SETITEMS
# b'(u', # PyUnpickler doesn't raise
b'}(Nu',
b'\x81', # NEWOBJ
b')\x81',
b'\x85', # TUPLE1
b'\x86', # TUPLE2
b'N\x86',
b'\x87', # TUPLE3
b'N\x87',
b'NN\x87',
b'\x90', # ADDITEMS
# b'(\x90', # PyUnpickler raises AttributeError
b'\x91', # FROZENSET
b'\x92', # NEWOBJ_EX
b')}\x92',
b'\x93', # STACK_GLOBAL
b'Vlist\n\x93',
b'\x94', # MEMOIZE
]
for p in badpickles:
self.check_unpickling_error(self.bad_stack_errors, p)
def save_reduce(self, func, args, state=None,
listitems=None, dictitems=None, obj=None):
# Assert that args is a tuple or None
if not isinstance(args, tuple):
raise pickle.PicklingError("args from reduce() should be a tuple")
# Assert that func is callable
if not hasattr(func, '__call__'):
raise pickle.PicklingError("func from reduce should be callable")
save = self.save
write = self.write
# Protocol 2 special case: if func's name is __newobj__, use NEWOBJ
if self.proto >= 2 and getattr(func, "__name__", "") == "__newobj__":
cls = args[0]
if not hasattr(cls, "__new__"):
raise pickle.PicklingError(
"args[0] from __newobj__ args has no __new__")
if obj is not None and cls is not obj.__class__:
raise pickle.PicklingError(
"args[0] from __newobj__ args has the wrong class")
args = args[1:]
save(cls)
save(args)
write(pickle.NEWOBJ)
else:
save(func)
save(args)
write(pickle.REDUCE)
if obj is not None:
self.memoize(obj)
# More new special cases (that work with older protocols as
# well): when __reduce__ returns a tuple with 4 or 5 items,
# the 4th and 5th item should be iterators that provide list
# items and dict items (as (key, value) tuples), or None.
if listitems is not None:
self._batch_appends(listitems)
if dictitems is not None:
self._batch_setitems(dictitems)
if state is not None:
save(state)
write(pickle.BUILD)