def pack_into(cls, obj, buf, offs, idmap = None, implicit_offs = 0):
if idmap is not None:
objid = id(obj)
idmap[objid] = offs + implicit_offs
objlen = len(obj)
if objlen > MIN_COMPRESS_THRESHOLD:
objcomp = lz4_compress(obj)
objcomplen = len(objcomp)
if objcomplen < (objlen - objlen/3):
# Must get substantial compression to pay the price
obj = objcomp
objlen = objcomplen
compressed = 0x8000
else:
compressed = 0
del objcomp
else:
compressed = 0
if (offs + 16 + len(obj)) > len(buf):
raise struct.error('buffer too small')
if cython.compiled:
try:
buf = _likebuffer(buf)
PyObject_GetBuffer(buf, cython.address(pybuf), PyBUF_WRITABLE) # lint:ok
pbuf = cython.cast(cython.p_char, pybuf.buf) + offs # lint:ok
if objlen < 0x7FFF:
cython.cast('_varstr_header *', pbuf).shortlen = objlen | compressed
offs += cython.sizeof(cython.ushort)
pbuf += cython.sizeof(cython.ushort)
else:
cython.cast('_varstr_header *', pbuf).shortlen = 0x7FFF | compressed
cython.cast('_varstr_header *', pbuf).biglen = objlen
offs += cython.sizeof('_varstr_header')
pbuf += cython.sizeof('_varstr_header')
memcpy(pbuf, cython.cast(cython.p_char, obj), objlen) # lint:ok
finally:
PyBuffer_Release(cython.address(pybuf)) # lint:ok
else:
if objlen < 0x7FFF:
hpacker = struct.Struct('=H')
hpacker.pack_into(buf, offs, objlen | compressed)
offs += hpacker.size
else:
qpacker = struct.Struct('=HQ')
qpacker.pack_into(buf, offs, 0x7FFF | compressed, objlen)
offs += qpacker.size
buf[offs:offs+objlen] = obj
offs += objlen
return offs
评论列表
文章目录