def test_pub(self):
"""Publish log messages. connect() to PUB socket."""
# pylint: disable=E1101
context = zmq.Context()
pub = context.socket(zmq.PUB)
try:
_address = 'tcp://{}:{}'.format(self.sub_host, self.sub_port)
pub.connect(_address)
except zmq.ZMQError as error:
print('ERROR:', error)
time.sleep(0.1)
send_count = self.send_count
for i in range(send_count):
pub.send_string('hi there {}'.format(i))
time.sleep(1e-5)
# Wait for the watcher thread to exit
while self.watcher.isAlive():
self.watcher.join(timeout=1e-5)
pub.close()
context.term()
test_zmq_pub_sub.py 文件源码
python
阅读 25
收藏 0
点赞 0
评论 0
评论列表
文章目录