test_interoperability.py 文件源码

python
阅读 29 收藏 0 点赞 0 评论 0

项目:azmq 作者: ereOn 项目源码 文件源码
def test_tcp_sub_socket(event_loop, socket_factory, connect_or_bind):
    xpub_socket = socket_factory.create(zmq.XPUB)
    connect_or_bind(xpub_socket, 'tcp://127.0.0.1:3333', reverse=True)

    def run():
        # Wait one second for the subscription to arrive.
        assert xpub_socket.poll(1000) == zmq.POLLIN
        topic = xpub_socket.recv_multipart()
        assert topic == [b'\x01a']
        xpub_socket.send_multipart([b'a', b'message'])

        if connect_or_bind == 'connect':
            assert xpub_socket.poll(1000) == zmq.POLLIN
            topic = xpub_socket.recv_multipart()
            assert topic == [b'\x00a']

    with run_in_background(run):
        async with azmq.Context(loop=event_loop) as context:
            socket = context.socket(azmq.SUB)
            await socket.subscribe(b'a')
            connect_or_bind(socket, 'tcp://127.0.0.1:3333')

            frames = await asyncio.wait_for(socket.recv_multipart(), 1)
            assert frames == [b'a', b'message']
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号