mapped_struct.py 文件源码

python
阅读 21 收藏 0 点赞 0 评论 0

项目:sharedbuffers 作者: jampp 项目源码 文件源码
def pack_into(cls, obj, buf, offs, idmap = None, implicit_offs = 0):
        if idmap is not None:
            objid = id(obj)
            idmap[objid] = offs + implicit_offs
        objlen = len(obj)

        if objlen > MIN_COMPRESS_THRESHOLD:
            objcomp = lz4_compress(obj)
            objcomplen = len(objcomp)
            if objcomplen < (objlen - objlen/3):
                # Must get substantial compression to pay the price
                obj = objcomp
                objlen = objcomplen
                compressed = 0x8000
            else:
                compressed = 0
            del objcomp
        else:
            compressed = 0

        if (offs + 16 + len(obj)) > len(buf):
            raise struct.error('buffer too small')
        if cython.compiled:
            try:
                buf = _likebuffer(buf)
                PyObject_GetBuffer(buf, cython.address(pybuf), PyBUF_WRITABLE)  # lint:ok
                pbuf = cython.cast(cython.p_char, pybuf.buf) + offs  # lint:ok

                if objlen < 0x7FFF:
                    cython.cast('_varstr_header *', pbuf).shortlen = objlen | compressed
                    offs += cython.sizeof(cython.ushort)
                    pbuf += cython.sizeof(cython.ushort)
                else:
                    cython.cast('_varstr_header *', pbuf).shortlen = 0x7FFF | compressed
                    cython.cast('_varstr_header *', pbuf).biglen = objlen
                    offs += cython.sizeof('_varstr_header')
                    pbuf += cython.sizeof('_varstr_header')
                memcpy(pbuf, cython.cast(cython.p_char, obj), objlen)  # lint:ok
            finally:
                PyBuffer_Release(cython.address(pybuf))  # lint:ok
        else:
            if objlen < 0x7FFF:
                hpacker = struct.Struct('=H')
                hpacker.pack_into(buf, offs, objlen | compressed)
                offs += hpacker.size
            else:
                qpacker = struct.Struct('=HQ')
                qpacker.pack_into(buf, offs, 0x7FFF | compressed, objlen)
                offs += qpacker.size
            buf[offs:offs+objlen] = obj
        offs += objlen
        return offs
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号