def recv_messages(zmq_subscriber, timeout_count, message_count):
"""Test utility function.
Subscriber thread that receives and counts ZMQ messages.
Args:
zmq_subscriber (zmq.Socket): ZMQ subscriber socket.
timeout_count (int): No. of failed receives until exit.
message_count (int): No. of messages expected to be received.
Returns:
(int) Number of messages received.
"""
# pylint: disable=E1101
fails = 0 # No. of receives that didn't return a message.
receive_count = 0 # Total number of messages received.
while fails < timeout_count:
try:
_ = zmq_subscriber.recv_string(flags=zmq.NOBLOCK)
fails = 0
receive_count += 1
if receive_count == message_count:
break
except zmq.ZMQError as error:
if error.errno == zmq.EAGAIN:
pass
else:
raise
fails += 1
time.sleep(1e-6)
return receive_count
test_zmq_pub_sub.py 文件源码
python
阅读 18
收藏 0
点赞 0
评论 0
评论列表
文章目录