def start(self, socket):
"""
Start the monitoring thread and socket.
:param socket: Socket to monitor.
"""
# Start a thread only if it is not already running.
if self.monitor_listening.is_set():
return
# Setup monitor socket.
monitor_socket = socket.get_monitor_socket(events=self.events)
monitor_socket.setsockopt(zmq.RCVTIMEO, self.receive_timeout)
self.monitor_listening.set()
def event_listener(monitor_listening):
while monitor_listening.is_set():
try:
event = recv_monitor_message(monitor_socket)
# The socket is closed, just stop listening now.
if event["event"] == zmq.EVENT_CLOSED:
monitor_listening.clear()
self._notify_listeners(event)
# In case the receive cannot be completed before the timeout.
except zmq.Again:
# Heartbeat for listeners - we do not need an additional thread for time based listeners.
self._notify_listeners(None)
# Cleanup monitor socket.
socket.disable_monitor()
monitor_socket.close()
self.monitor_thread = threading.Thread(target=event_listener, args=(self.monitor_listening,))
# In case someone does not call disconnect, this will stop the thread anyway.
self.monitor_thread.daemon = True
self.monitor_thread.start()
评论列表
文章目录