def msgpack_appendable_pack(o, path):
open(path, 'a+').close() # touch
with open(path, mode='r+b') as f:
packer = msgpack.Packer()
unpacker = msgpack.Unpacker(f)
if type(o) == list:
try:
previous_len = unpacker.read_array_header()
except msgpack.OutOfData:
previous_len = 0
# calculate and replace header
header = packer.pack_array_header(previous_len + len(o))
f.seek(0)
f.write(header)
f.write(bytes(1) * (MAX_MSGPACK_ARRAY_HEADER_LEN - len(header)))
# append new elements
f.seek(0, 2)
for element in o:
f.write(packer.pack(element))
else:
f.write(packer.pack(o))
评论列表
文章目录