def _unpack_bytes_from_pybuffer(buf, offs, idmap):
if idmap is not None and offs in idmap:
return idmap[offs]
if cython.compiled:
try:
buf = _likebuffer(buf)
PyObject_GetBuffer(buf, cython.address(pybuf), PyBUF_SIMPLE) # lint:ok
rv = _unpack_bytes_from_cbuffer(cython.cast(cython.p_char, pybuf.buf), offs, pybuf.len, None) # lint:ok
finally:
PyBuffer_Release(cython.address(pybuf)) # lint:ok
else:
hpacker = struct.Struct('=H')
objlen = hpacker.unpack_from(buf, offs)[0]
offs = int(offs)
dataoffs = offs + hpacker.size
compressed = (objlen & 0x8000) != 0
if (objlen & 0x7FFF) == 0x7FFF:
qpacker = struct.Struct('=HQ')
objlen = qpacker.unpack_from(buf, offs)[1]
dataoffs = offs + qpacker.size
else:
objlen = objlen & 0x7FFF
rv = buffer(buf, dataoffs, objlen)
if compressed:
rv = lz4_decompress(rv)
else:
rv = bytes(rv)
if idmap is not None:
idmap[offs] = rv
return rv
评论列表
文章目录