test_message.py 文件源码

python
阅读 35 收藏 0 点赞 0 评论 0

项目:trex-http-proxy 作者: alwye 项目源码 文件源码
def test_noncopying_recv(self):
        """check for clobbering message buffers"""
        null = b'\0'*64
        sa,sb = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
        for i in range(32):
            # try a few times
            sb.send(null, copy=False)
            m = sa.recv(copy=False)
            mb = m.bytes
            # buf = view(m)
            buf = m.buffer
            del m
            for i in range(5):
                ff=b'\xff'*(40 + i*10)
                sb.send(ff, copy=False)
                m2 = sa.recv(copy=False)
                if view.__name__ == 'buffer':
                    b = bytes(buf)
                else:
                    b = buf.tobytes()
                self.assertEqual(b, null)
                self.assertEqual(mb, null)
                self.assertEqual(m2.bytes, ff)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号