test_zmq_pub_sub.py 文件源码

python
阅读 22 收藏 0 点赞 0 评论 0

项目:integration-prototype 作者: SKA-ScienceDataProcessor 项目源码 文件源码
def test_pub(self):
        """Publish log messages. bind() to PUB socket."""
        # pylint: disable=E1101
        context = zmq.Context()
        pub = context.socket(zmq.PUB)
        try:
            pub.bind('tcp://*:{}'.format(self.sub_port))
        except zmq.ZMQError as error:
            print(error)
        time.sleep(0.1)

        send_count = self.send_count
        for i in range(send_count):
            pub.send_string('hi there {}'.format(i))
            time.sleep(1e-5)
        sys.stdout.flush()

        # Wait for the watcher thread to exit.
        while self.watcher.isAlive():
            self.watcher.join(timeout=1e-5)

        pub.close()
        context.term()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号