def test_pub(self):
"""Publish log messages. bind() to PUB socket."""
# pylint: disable=E1101
context = zmq.Context()
pub = context.socket(zmq.PUB)
try:
pub.bind('tcp://*:{}'.format(self.sub_port))
except zmq.ZMQError as error:
print(error)
time.sleep(0.1)
send_count = self.send_count
for i in range(send_count):
pub.send_string('hi there {}'.format(i))
time.sleep(1e-5)
sys.stdout.flush()
# Wait for the watcher thread to exit.
while self.watcher.isAlive():
self.watcher.join(timeout=1e-5)
pub.close()
context.term()
test_zmq_pub_sub.py 文件源码
python
阅读 22
收藏 0
点赞 0
评论 0
评论列表
文章目录