def test_tcp_xpub_socket(event_loop, socket_factory, connect_or_bind):
sub_socket = socket_factory.create(zmq.SUB)
sub_socket.setsockopt(zmq.SUBSCRIBE, b'a')
connect_or_bind(sub_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
frames = sub_socket.recv_multipart()
assert frames == [b'a', b'message']
with run_in_background(run) as thread_done_event:
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.XPUB)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'\1a']
while not thread_done_event.is_set():
await socket.send_multipart([b'a', b'message'])
await socket.send_multipart([b'b', b'wrong'])
sub_socket.close()
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'\0a']
评论列表
文章目录