def test_recv_multipart(self):
@asyncio.coroutine
def test():
a, b = self.create_bound_pair(zmq.PUSH, zmq.PULL)
f = b.recv_multipart()
assert not f.done()
yield from a.send(b'hi')
recvd = yield from f
self.assertEqual(recvd, [b'hi'])
self.loop.run_until_complete(test())
评论列表
文章目录