def msgpack_appendable_pack(o, path):
open(path, 'a+').close() # touch
with open(path, mode='r+b') as f:
packer = msgpack.Packer()
unpacker = msgpack.Unpacker(f)
if type(o) == list:
try:
previous_len = unpacker.read_array_header()
except msgpack.OutOfData:
previous_len = 0
# calculate and replace header
header = packer.pack_array_header(previous_len + len(o))
f.seek(0)
f.write(header)
f.write(bytes(1) * (MAX_MSGPACK_ARRAY_HEADER_LEN - len(header)))
# append new elements
f.seek(0, 2)
for element in o:
f.write(packer.pack(element))
else:
f.write(packer.pack(o))
python类Packer()的实例源码
def stream(data, writer):
packer = msgpack.Packer()
writer(packer.pack_map_header(len(data)))
for key, val in data.iteritems():
writer(packer.pack(key))
if issubclass(type(val), file): # File obj
max_size = os.fstat(val.fileno()).st_size - val.tell()
size = min(max_size, val.read_bytes)
bytes_left = size
writer(msgpackHeader(size))
buff = 1024 * 64
while 1:
writer(val.read(min(bytes_left, buff)))
bytes_left = bytes_left - buff
if bytes_left <= 0:
break
else: # Simple
writer(packer.pack(val))
return size
def encode_request(self, rpc_request):
# ?rpc??????
import protocol, msgpack
if_success = False
if not isinstance(rpc_request, protocol.RPCRequest):
# ????????
log_message = 'can not serlize non-rpc_request obj'
self._logger.write_log(log_message, 'error')
return if_success, None
try:
rpc_request_data = msgpack.Packer(use_bin_type=True).pack(rpc_request.content)
if_success = True
return if_success, rpc_request_data
except:
# ?????
log_message = 'rpc request serialize failed'
self._logger.write_log(log_message, 'error')
return if_success, None
def encode_response(self, rpc_response):
# ?rpc??????
import protocol, msgpack
if_success = False
if not isinstance(rpc_response, protocol.RPCResponse):
# ????????
log_message = 'can not serlize non-rpc_response obj'
self._logger.write_log(log_message, 'error')
return if_success, None
try:
rpc_response_data = msgpack.Packer(use_bin_type=True).pack(rpc_response.content)
if_success = True
return if_success, rpc_response_data
except:
# ?????
log_message = 'rpc response serlize failed'
self._logger.write_log(log_message, 'error')
return if_success, None
pass
def _outgoing_connect_cb(self, peer_id, tcp_handle, error):
"""Called on attempt to make outgoing connection to a peer."""
if error is not None:
self._logger.error(
"unable to establish connction to peer %d",
peer_id,
)
del self._outgoing[peer_id]
return
self._outgoing[peer_id] = Outgoing(
tcp_handle,
msgpack.Packer(),
msgpack.Unpacker(),
self._conf[peer_id],
{},
)
tcp_handle.start_read(partial(self._outgoing_read_cb, peer_id))
self._logger.info("connect to peer %d", peer_id)
def __init__(self, conf, timeout):
self._conf = conf
self._timeout = timeout
self._unpacker = msgpack.Unpacker()
self._packer = msgpack.Packer()
# connection variables
self._peers = {} # peer_id -> Peer
self._sock_to_peer = {} # socket.connection -> Peer
self._peers_lock = threading.Lock() # for _peers and _sock_to_peers
# request / response variables
self._req_count = 0
self._req_count_lock = threading.Lock()
# For reuse of ValueEvent objects by a thread.
self._threadlocal = threading.local()
self._patch_client_for_gevent()
self._bg_thread = threading.Thread(
target=self._process_requests_in_background
)
self._bg_thread.setDaemon(True)
self._bg_thread.start()
def req(self, data, timeout):
"""Sends data to the other side and waits for a response.
If no response within timeout period (or connection failure)
then raises an error.
"""
# fixme: handle timeout
packer = msgpack.Packer()
unpacker = msgpack.Unpacker()
with self._sock_lock:
# fixme: handle some errors here
#print "sending data", data
self._sock.sendall(packer.pack(data))
while True:
amt = self._sock.recv_into(self._req_buffer, 1000000)
if not self._req_buffer:
raise ValueError("socket closed fixme: raise real error")
unpacker.feed(self._req_mv[:amt])
for m in unpacker:
# We only expect a single message in response
# because of the synchronous pattern from sendall.
return m
def start(self):
packer = msgpack.Packer()
unpacker = msgpack.Unpacker()
buf = bytearray(1000000)
mv = memoryview(buf)
while True:
# fixme: switch to buffer object
#buf = self._sock.recv(1024*1024)
amt = self._sock.recv_into(buf, 1000000)
if not buf:
# fixme: HANDLE THIS
break
#print len(buf), buf
unpacker.feed(mv[:amt])
for m in unpacker:
rep = self._rep_handler(m)
self._sock.sendall(packer.pack(rep)) # fixme: handle error
def __init__(self, io, opts={}):
self.io = io
self.packer = msgpack.Packer(autoreset=False)
nopts = MsgPackMarshaler.default_opts.copy()
nopts.update(opts)
Marshaler.__init__(self, nopts)
def msgpack_appendable_unpack(path):
# if not list?
# return msgpack.unpackb(f.read())
with open(path, 'rb') as f:
packer = msgpack.Packer()
unpacker = msgpack.Unpacker(f, encoding='utf-8')
length = unpacker.read_array_header()
header_lenght = len(packer.pack_array_header(length))
unpacker.read_bytes(MAX_MSGPACK_ARRAY_HEADER_LEN - header_lenght)
f.seek(MAX_MSGPACK_ARRAY_HEADER_LEN)
return [unpacker.unpack() for _ in range(length)]
def save_pack(self, fout, encoding='utf-8'):
packer = msgpack.Packer()
for id, word, count in sorted(self.items()):
fout.write(packer.pack((id, word.encode(encoding), count)))
def encode_and_pack(vocab, fin, fout, input_sepline=sepline):
packer = msgpack.Packer()
for line in fin:
words = input_sepline(line)
encoded = [vocab.sos_id]
encoded.extend([vocab.get_id(word) for word in words])
encoded.append(vocab.eos_id)
fout.write(packer.pack(encoded))
def build_test_data(destdir):
l = get_test_data_list()
for i in range(len(l)):
# packer = msgpack.Packer()
serialized = msgpack.dumps(l[i])
f = open(os.path.join(destdir, str(i) + '.msgpack.golden'), 'wb')
f.write(serialized)
f.close()
serialized = cbor.dumps(l[i])
f = open(os.path.join(destdir, str(i) + '.cbor.golden'), 'wb')
f.write(serialized)
f.close()
def __init__(self):
import msgpack
self.msgpack = msgpack
self.packer = msgpack.Packer(
encoding='utf-8',
autoreset=True,
use_bin_type=True,
)
def _get_converted_model(self, service, user_version, user_data, config):
user_raw = BytesIO()
# Write user container header to user_raw.
pk = msgpack.Packer()
user_raw.write(pk.pack_array_header(2))
user_raw.write(pk.pack(user_version))
user_raw.write(pk.pack_array_header(len(user_data)))
# Write user_data to user_raw.
for d in user_data:
user_raw.write(d.getvalue())
# Create transformed model.
m1 = self._m
m2 = JubaModel()
m2.header = copy.deepcopy(m1.header)
m2.system = copy.deepcopy(m1.system)
m2.system.type = service
m2.system.config = json.dumps(config)
m2._user_raw = user_raw.getvalue()
m2.user = JubaModel.UserContainer.loads(m2._user_raw)
# Recompute CRC32 checksum and field lengths.
m2.fix_header()
return m2
def build_test_data(destdir):
l = get_test_data_list()
for i in range(len(l)):
# packer = msgpack.Packer()
serialized = msgpack.dumps(l[i])
f = open(os.path.join(destdir, str(i) + '.msgpack.golden'), 'wb')
f.write(serialized)
f.close()
serialized = cbor.dumps(l[i])
f = open(os.path.join(destdir, str(i) + '.cbor.golden'), 'wb')
f.write(serialized)
f.close()
def build_test_data(destdir):
l = get_test_data_list()
for i in range(len(l)):
# packer = msgpack.Packer()
serialized = msgpack.dumps(l[i])
f = open(os.path.join(destdir, str(i) + '.msgpack.golden'), 'wb')
f.write(serialized)
f.close()
serialized = cbor.dumps(l[i])
f = open(os.path.join(destdir, str(i) + '.cbor.golden'), 'wb')
f.write(serialized)
f.close()
def __init__(self):
super(MessageEncoder, self).__init__()
# note: on-wire msgpack has no notion of encoding.
# the msgpack-python library implicitly converts unicode to
# utf-8 encoded bytes by default. we don't want to rely on
# the behaviour though because it seems to be going to change.
# cf. https://gist.github.com/methane/5022403
self._packer = msgpack.Packer(encoding=None)
self._unpacker = msgpack.Unpacker(encoding=None)
self._next_msgid = 0
def __init__(self, socket, outgoing_msg_sink_iter):
super(RpcSession, self).__init__("RpcSession(%s)" % socket)
import msgpack
self._packer = msgpack.Packer()
self._unpacker = msgpack.Unpacker()
self._next_msgid = 0
self._socket = socket
self._outgoing_msg_sink_iter = outgoing_msg_sink_iter
def testPackUnicode():
test_data = ["", "abcd", ["defgh"], "??????? ?????"]
for td in test_data:
re = unpackb(packb(td, encoding='utf-8'), use_list=1, encoding='utf-8')
assert re == td
packer = Packer(encoding='utf-8')
data = packer.pack(td)
re = Unpacker(BytesIO(data), encoding=str('utf-8'), use_list=1).unpack()
assert re == td
def testArraySize(sizes=[0, 5, 50, 1000]):
bio = BytesIO()
packer = Packer()
for size in sizes:
bio.write(packer.pack_array_header(size))
for i in range(size):
bio.write(packer.pack(i))
bio.seek(0)
unpacker = Unpacker(bio, use_list=1)
for size in sizes:
assert unpacker.unpack() == list(range(size))
def test_manualreset(sizes=[0, 5, 50, 1000]):
packer = Packer(autoreset=False)
for size in sizes:
packer.pack_array_header(size)
for i in range(size):
packer.pack(i)
bio = BytesIO(packer.bytes())
unpacker = Unpacker(bio, use_list=1)
for size in sizes:
assert unpacker.unpack() == list(range(size))
packer.reset()
assert packer.bytes() == b''
def testMapSize(sizes=[0, 5, 50, 1000]):
bio = BytesIO()
packer = Packer()
for size in sizes:
bio.write(packer.pack_map_header(size))
for i in range(size):
bio.write(packer.pack(i)) # key
bio.write(packer.pack(i * 2)) # value
bio.seek(0)
unpacker = Unpacker(bio)
for size in sizes:
assert unpacker.unpack() == dict((i, i * 2) for i in range(size))
def test_pairlist():
pairlist = [(b'a', 1), (2, b'b'), (b'foo', b'bar')]
packer = Packer()
packed = packer.pack_map_pairs(pairlist)
unpacked = unpackb(packed, object_pairs_hook=list)
assert pairlist == unpacked
def test_pack_ext_type():
def p(s):
packer = msgpack.Packer()
packer.pack_ext_type(0x42, s)
return packer.bytes()
assert p(b'A') == b'\xd4\x42A' # fixext 1
assert p(b'AB') == b'\xd5\x42AB' # fixext 2
assert p(b'ABCD') == b'\xd6\x42ABCD' # fixext 4
assert p(b'ABCDEFGH') == b'\xd7\x42ABCDEFGH' # fixext 8
assert p(b'A'*16) == b'\xd8\x42' + b'A'*16 # fixext 16
assert p(b'ABC') == b'\xc7\x03\x42ABC' # ext 8
assert p(b'A'*0x0123) == b'\xc8\x01\x23\x42' + b'A'*0x0123 # ext 16
assert p(b'A'*0x00012345) == b'\xc9\x00\x01\x23\x45\x42' + b'A'*0x00012345 # ext 32
def _incoming_connection(self, server, error):
"""Called on a remote client's attempt to connect to this Node."""
if error is not None:
return
client = pyuv.TCP(self._loop)
server.accept(client)
self._incoming[client] = Incoming(
msgpack.Packer(),
msgpack.Unpacker(),
)
client.start_read(self._incoming_read)
def __init__(self):
super(MessageEncoder, self).__init__()
self._packer = msgpack.Packer(encoding='utf-8', use_bin_type=True)
self._unpacker = msgpack.Unpacker(encoding='utf-8')
self._next_msgid = 0
def __init__(self, sock, outgoing_msg_sink_iter):
self.peer_name = str(sock.getpeername())
super(RpcSession, self).__init__(self.NAME_FMT % self.peer_name)
self._packer = msgpack.Packer(encoding='utf-8')
self._unpacker = msgpack.Unpacker(encoding='utf-8')
self._next_msgid = 0
self._socket = sock
self._outgoing_msg_sink_iter = outgoing_msg_sink_iter
self.is_connected = True
def build_test_data(destdir):
l = get_test_data_list()
for i in range(len(l)):
# packer = msgpack.Packer()
serialized = msgpack.dumps(l[i])
f = open(os.path.join(destdir, str(i) + '.msgpack.golden'), 'wb')
f.write(serialized)
f.close()
serialized = cbor.dumps(l[i])
f = open(os.path.join(destdir, str(i) + '.cbor.golden'), 'wb')
f.write(serialized)
f.close()
def __init__(self, filename_or_stream):
if hasattr(filename_or_stream, 'write'):
self.stream = filename_or_stream
else:
self.stream = open(filename_or_stream, 'wb')
self.packer = msgpack.Packer(encoding=encoding)