def __init__(self, ctx, url, topics=(), block_until_connected=True):
self.socket = zmq.Socket(ctx, zmq.SUB)
assert type(topics) != str
if block_until_connected:
# connect node and block until a connecetion has been made
monitor = self.socket.get_monitor_socket()
self.socket.connect(url)
while True:
status = recv_monitor_message(monitor)
if status['event'] == zmq.EVENT_CONNECTED:
break
elif status['event'] == zmq.EVENT_CONNECT_DELAYED:
pass
else:
raise Exception("ZMQ connection failed")
self.socket.disable_monitor()
else:
self.socket.connect(url)
for t in topics:
self.subscribe(t)
评论列表
文章目录