tools.py 文件源码

python
阅读 20 收藏 0 点赞 0 评论 0

项目:mflow 作者: datastreaming 项目源码 文件源码
def start(self, socket):
        """
        Start the monitoring thread and socket.
        :param socket: Socket to monitor.
        """
        # Start a thread only if it is not already running.
        if self.monitor_listening.is_set():
            return

        # Setup monitor socket.
        monitor_socket = socket.get_monitor_socket(events=self.events)
        monitor_socket.setsockopt(zmq.RCVTIMEO, self.receive_timeout)
        self.monitor_listening.set()

        def event_listener(monitor_listening):
            while monitor_listening.is_set():
                try:
                    event = recv_monitor_message(monitor_socket)
                    # The socket is closed, just stop listening now.
                    if event["event"] == zmq.EVENT_CLOSED:
                        monitor_listening.clear()

                    self._notify_listeners(event)
                # In case the receive cannot be completed before the timeout.
                except zmq.Again:
                    # Heartbeat for listeners - we do not need an additional thread for time based listeners.
                    self._notify_listeners(None)

            # Cleanup monitor socket.
            socket.disable_monitor()
            monitor_socket.close()

        self.monitor_thread = threading.Thread(target=event_listener, args=(self.monitor_listening,))
        # In case someone does not call disconnect, this will stop the thread anyway.
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号