def test_tcp_push_socket(event_loop, socket_factory, connect_or_bind):
pull_socket = socket_factory.create(zmq.PULL)
connect_or_bind(pull_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
assert pull_socket.poll(1000) == zmq.POLLIN
message = pull_socket.recv_multipart()
assert message == [b'hello', b'world']
with run_in_background(run) as event:
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.PUSH)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
await socket.send_multipart([b'hello', b'world'])
while not event.is_set():
await asyncio.sleep(0.1)
评论列表
文章目录