def _connect(self):
# Subscribe to the hello topic - once we recieve a hello we'll send a request
# for real data. The callback plugin will effectively block execution until we
# Send this request
self.socket.setsockopt(zmq.SUBSCRIBE, 'hello')
# Define the control socket for responding to the 'hello' topic
control_socket = self.context.socket(zmq.REQ)
control_socket.connect(self._env['DAUBER_CONTROL_SOCKET_URI'])
timeout = 500
t_last = time.time()
while (time.time() - t_last) < timeout:
ready = dict(self.poller.poll())
if ready.get(self.socket):
topic, _ = self.socket.recv_multipart()
if topic == 'hello':
# Signal that we've connected and we're ready to recieve data
control_socket.send(b'')
control_socket.recv()
break
assert (time.time() - t_last) < timeout, \
"Timed out before recieving a hello topic message from the publisher."
del control_socket
self.socket.setsockopt(zmq.UNSUBSCRIBE, 'hello')
评论列表
文章目录