def default(self, obj):
"""
Encodes unknown object types (we use it to make extended types)
"""
if isinstance(obj, datetime.datetime):
# Datetimes. Make sure it's naive.
if obj.tzinfo is not None:
raise TypeError("Cannot encode timezone-aware datetimes to msgpack")
# Then, work out the timestamp in seconds
seconds = (obj - datetime.datetime(1970, 1, 1)).total_seconds()
microseconds = int(seconds * 1000000)
# Then pack it into a big-endian signed 64-bit integer
return msgpack.ExtType(self.EXT_DATETIME, self.STRUCT_DATETIME.pack(microseconds))
elif isinstance(obj, currint.Amount):
# Currint. Start with the lowercased currency code as bytes
code = obj.currency.code.upper()
if isinstance(code, six.text_type):
code = code.encode("ascii")
# Then pack it in with the minor value
return msgpack.ExtType(self.EXT_CURRINT, self.STRUCT_CURRINT.pack(code, obj.value))
else:
# Wuh-woh
raise TypeError("Cannot encode value to msgpack: %r" % (obj,))
python类ExtType()的实例源码
def test_unpacker_ext_hook():
class MyUnpacker(Unpacker):
def __init__(self):
super(MyUnpacker, self).__init__(ext_hook=self._hook,
encoding='utf-8')
def _hook(self, code, data):
if code == 1:
return int(data)
else:
return ExtType(code, data)
unpacker = MyUnpacker()
unpacker.feed(packb({'a': 1}, encoding='utf-8'))
assert unpacker.unpack() == {'a': 1}
unpacker.feed(packb({'a': ExtType(1, b'123')}, encoding='utf-8'))
assert unpacker.unpack() == {'a': 123}
unpacker.feed(packb({'a': ExtType(2, b'321')}, encoding='utf-8'))
assert unpacker.unpack() == {'a': ExtType(2, b'321')}
def test_ext():
def check(ext, packed):
assert packb(ext) == packed
assert unpackb(packed) == ext
check(ExtType(0x42, b'Z'), b'\xd4\x42Z') # fixext 1
check(ExtType(0x42, b'ZZ'), b'\xd5\x42ZZ') # fixext 2
check(ExtType(0x42, b'Z'*4), b'\xd6\x42' + b'Z'*4) # fixext 4
check(ExtType(0x42, b'Z'*8), b'\xd7\x42' + b'Z'*8) # fixext 8
check(ExtType(0x42, b'Z'*16), b'\xd8\x42' + b'Z'*16) # fixext 16
# ext 8
check(ExtType(0x42, b''), b'\xc7\x00\x42')
check(ExtType(0x42, b'Z'*255), b'\xc7\xff\x42' + b'Z'*255)
# ext 16
check(ExtType(0x42, b'Z'*256), b'\xc8\x01\x00\x42' + b'Z'*256)
check(ExtType(0x42, b'Z'*0xffff), b'\xc8\xff\xff\x42' + b'Z'*0xffff)
# ext 32
check(ExtType(0x42, b'Z'*0x10000), b'\xc9\x00\x01\x00\x00\x42' + b'Z'*0x10000)
# needs large memory
#check(ExtType(0x42, b'Z'*0xffffffff),
# b'\xc9\xff\xff\xff\xff\x42' + b'Z'*0xffffffff)
def test_extension_type():
def default(obj):
print('default called', obj)
if isinstance(obj, array.array):
typecode = 123 # application specific typecode
data = obj.tostring()
return ExtType(typecode, data)
raise TypeError("Unknown type object %r" % (obj,))
def ext_hook(code, data):
print('ext_hook called', code, data)
assert code == 123
obj = array.array('d')
obj.fromstring(data)
return obj
obj = [42, b'hello', array.array('d', [1.1, 2.2, 3.3])]
s = msgpack.packb(obj, default=default)
obj2 = msgpack.unpackb(s, ext_hook=ext_hook)
assert obj == obj2
def default(self, obj):
replacer = self.__type_replacements.get(type(obj), None)
if replacer:
obj = replacer(obj)
if isinstance(obj, set):
return tuple(obj) # msgpack module can't deal with sets so we make a tuple out of it
if isinstance(obj, uuid.UUID):
return str(obj)
if isinstance(obj, bytearray):
return bytes(obj)
if isinstance(obj, complex):
return msgpack.ExtType(0x30, struct.pack("dd", obj.real, obj.imag))
if isinstance(obj, datetime.datetime):
if obj.tzinfo:
raise errors.SerializeError("msgpack cannot serialize datetime with timezone info")
return msgpack.ExtType(0x32, struct.pack("d", obj.timestamp()))
if isinstance(obj, datetime.date):
return msgpack.ExtType(0x33, struct.pack("l", obj.toordinal()))
if isinstance(obj, decimal.Decimal):
return str(obj)
if isinstance(obj, numbers.Number):
return msgpack.ExtType(0x31, str(obj).encode("ascii")) # long
return self.class_to_dict(obj)
def default(obj):
if isinstance(obj, PasteFile):
return msgpack.ExtType(42, obj.to_dict())
raise TypeError('Unknown type: %r' % (obj,))
def ext_hook(code, data):
if code == 42:
p = PasteFile.from_dict(data)
return p
return msgpack.ExtType(code, data)
def msgpack_encoder(value):
""" Encoder used by msgpack serializer when an object is unknown.
This hook is basically used to serialize dates in Arrow objects.
"""
if isinstance(value, arrow.Arrow):
value = msgpack.ExtType(1, value.isoformat().encode())
return value
def msgpack_ext_decoder(code, data):
""" Decoded used by msgpack deserializer when an ext type is found.
This hook is basically used to deserialize dates in Arrow objects.
"""
if code == 1:
try:
return arrow.get(data.decode())
except arrow.parser.ParserError:
return arrow.Arrow.strptime(data.decode(), '%Y%m%dT%H:%M:%S.%f')
return msgpack.ExtType(code, data)
def decode_ext(code, data):
if code == PackDate_ExtType:
values = msgpack.unpackb(data)
return datetime.datetime(*values)
elif code == PackObj_ExtType:
return cPickle.loads(data)
return msgpack.ExtType(code, data)
def encode_ext(obj):
if isinstance(obj, datetime.datetime):
components = (obj.year, obj.month, obj.day, obj.hour, obj.minute,
obj.second, obj.microsecond)
data = msgpack.ExtType(PackDate_ExtType, msgpack.packb(components))
return data
return msgpack.ExtType(
PackObj_ExtType,
cPickle.dumps(obj, protocol=cPickle.HIGHEST_PROTOCOL))
def test_msgpacks_fail_to_load_invalid_data():
def default(ob):
return msgpack.ExtType(127, b"")
with pytest.raises(ValueError):
props.Msgpack().prepare_to_load(None, msgpack.packb(object(), default=default))
def _serialize(cls, value):
if isinstance(value, model.Model):
kind, value = cls.Extensions.Model, cls._entity_to_dict(value)
elif isinstance(value, datetime):
kind, value = cls.Extensions.DateTime, _seconds_since_epoch(value)
else:
raise TypeError(f"Value of type {type(value)} cannot be serialized.")
return msgpack.ExtType(kind, cls._dumps(value))
def test_unpack_ext_type():
def check(b, expected):
assert msgpack.unpackb(b) == expected
check(b'\xd4\x42A', ExtType(0x42, b'A')) # fixext 1
check(b'\xd5\x42AB', ExtType(0x42, b'AB')) # fixext 2
check(b'\xd6\x42ABCD', ExtType(0x42, b'ABCD')) # fixext 4
check(b'\xd7\x42ABCDEFGH', ExtType(0x42, b'ABCDEFGH')) # fixext 8
check(b'\xd8\x42' + b'A'*16, ExtType(0x42, b'A'*16)) # fixext 16
check(b'\xc7\x03\x42ABC', ExtType(0x42, b'ABC')) # ext 8
check(b'\xc8\x01\x23\x42' + b'A'*0x0123,
ExtType(0x42, b'A'*0x0123)) # ext 16
check(b'\xc9\x00\x01\x23\x45\x42' + b'A'*0x00012345,
ExtType(0x42, b'A'*0x00012345)) # ext 32
def __init__(self, file_like=None, read_size=0, use_list=True,
object_hook=None, object_pairs_hook=None, list_hook=None,
encoding=None, unicode_errors='strict', max_buffer_size=0,
ext_hook=ExtType):
if file_like is None:
self._fb_feeding = True
else:
if not callable(file_like.read):
raise TypeError("`file_like.read` must be callable")
self.file_like = file_like
self._fb_feeding = False
self._fb_buffers = []
self._fb_buf_o = 0
self._fb_buf_i = 0
self._fb_buf_n = 0
self._max_buffer_size = max_buffer_size or 2**31-1
if read_size > self._max_buffer_size:
raise ValueError("read_size must be smaller than max_buffer_size")
self._read_size = read_size or min(self._max_buffer_size, 2048)
self._encoding = encoding
self._unicode_errors = unicode_errors
self._use_list = use_list
self._list_hook = list_hook
self._object_hook = object_hook
self._object_pairs_hook = object_pairs_hook
self._ext_hook = ext_hook
if list_hook is not None and not callable(list_hook):
raise TypeError('`list_hook` is not callable')
if object_hook is not None and not callable(object_hook):
raise TypeError('`object_hook` is not callable')
if object_pairs_hook is not None and not callable(object_pairs_hook):
raise TypeError('`object_pairs_hook` is not callable')
if object_hook is not None and object_pairs_hook is not None:
raise TypeError("object_pairs_hook and object_hook are mutually "
"exclusive")
if not callable(ext_hook):
raise TypeError("`ext_hook` is not callable")
def msgpack_default(conn, obj):
return ExtType(0, packb(marshal(conn, obj), use_bin_type=True))
def msgpack_ext_hook(code, data):
if code == 0:
return Reference(unpackb(data, encoding='utf-8'))
return ExtType(code, data)
def __init__(self, file_like=None, read_size=0, use_list=True,
object_hook=None, object_pairs_hook=None, list_hook=None,
encoding=None, unicode_errors='strict', max_buffer_size=0,
ext_hook=ExtType,
max_str_len=2147483647, # 2**32-1
max_bin_len=2147483647,
max_array_len=2147483647,
max_map_len=2147483647,
max_ext_len=2147483647):
if file_like is None:
self._feeding = True
else:
if not callable(file_like.read):
raise TypeError("`file_like.read` must be callable")
self.file_like = file_like
self._feeding = False
#: array of bytes fed.
self._buffer = bytearray()
#: Which position we currently reads
self._buff_i = 0
# When Unpacker is used as an iterable, between the calls to next(),
# the buffer is not "consumed" completely, for efficiency sake.
# Instead, it is done sloppily. To make sure we raise BufferFull at
# the correct moments, we have to keep track of how sloppy we were.
# Furthermore, when the buffer is incomplete (that is: in the case
# we raise an OutOfData) we need to rollback the buffer to the correct
# state, which _buf_checkpoint records.
self._buf_checkpoint = 0
self._max_buffer_size = max_buffer_size or 2**31-1
if read_size > self._max_buffer_size:
raise ValueError("read_size must be smaller than max_buffer_size")
self._read_size = read_size or min(self._max_buffer_size, 16*1024)
self._encoding = encoding
self._unicode_errors = unicode_errors
self._use_list = use_list
self._list_hook = list_hook
self._object_hook = object_hook
self._object_pairs_hook = object_pairs_hook
self._ext_hook = ext_hook
self._max_str_len = max_str_len
self._max_bin_len = max_bin_len
self._max_array_len = max_array_len
self._max_map_len = max_map_len
self._max_ext_len = max_ext_len
self._stream_offset = 0
if list_hook is not None and not callable(list_hook):
raise TypeError('`list_hook` is not callable')
if object_hook is not None and not callable(object_hook):
raise TypeError('`object_hook` is not callable')
if object_pairs_hook is not None and not callable(object_pairs_hook):
raise TypeError('`object_pairs_hook` is not callable')
if object_hook is not None and object_pairs_hook is not None:
raise TypeError("object_pairs_hook and object_hook are mutually "
"exclusive")
if not callable(ext_hook):
raise TypeError("`ext_hook` is not callable")