zmq_test.py 文件源码

python
阅读 22 收藏 0 点赞 0 评论 0

项目:deb-python-eventlet 作者: openstack 项目源码 文件源码
def test_change_subscription(self):
        # FIXME: Extensive testing showed this particular test is the root cause
        # of sporadic failures on Travis.
        pub, sub, port = self.create_bound_pair(zmq.PUB, zmq.SUB)
        sub.setsockopt(zmq.SUBSCRIBE, b'test')
        eventlet.sleep(0)
        sub_ready = eventlet.Event()
        sub_last = eventlet.Event()
        sub_done = eventlet.Event()

        def rx():
            while sub.recv() != b'test BEGIN':
                eventlet.sleep(0)
            sub_ready.send()
            count = 0
            while True:
                msg = sub.recv()
                if msg == b'test BEGIN':
                    # BEGIN may come many times
                    continue
                if msg == b'test LAST':
                    sub.setsockopt(zmq.SUBSCRIBE, b'done')
                    sub.setsockopt(zmq.UNSUBSCRIBE, b'test')
                    eventlet.sleep(0)
                    # In real application you should either sync
                    # or tolerate loss of messages.
                    sub_last.send()
                if msg == b'done DONE':
                    break
                count += 1
            sub_done.send(count)

        def tx():
            # Sync receiver ready to avoid loss of first packets
            while not sub_ready.ready():
                pub.send(b'test BEGIN')
                eventlet.sleep(0.005)
            for i in range(1, 101):
                msg = 'test {0}'.format(i).encode()
                if i != 50:
                    pub.send(msg)
                else:
                    pub.send(b'test LAST')
                    sub_last.wait()
                # XXX: putting a real delay of 1ms here fixes sporadic failures on Travis
                # just yield eventlet.sleep(0) doesn't cut it
                eventlet.sleep(0.001)
            pub.send(b'done DONE')

        eventlet.spawn(rx)
        eventlet.spawn(tx)
        rx_count = sub_done.wait()
        self.assertEqual(rx_count, 50)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号