test_monitor.py 文件源码

python
阅读 24 收藏 0 点赞 0 评论 0

项目:zanph 作者: zanph 项目源码 文件源码
def test_monitor(self):
        """Test monitoring interface for sockets."""
        s_rep = self.context.socket(zmq.REP)
        s_req = self.context.socket(zmq.REQ)
        self.sockets.extend([s_rep, s_req])
        s_req.bind("tcp://127.0.0.1:6666")
        # try monitoring the REP socket

        s_rep.monitor("inproc://monitor.rep", zmq.EVENT_ALL)
        # create listening socket for monitor
        s_event = self.context.socket(zmq.PAIR)
        self.sockets.append(s_event)
        s_event.connect("inproc://monitor.rep")
        s_event.linger = 0
        # test receive event for connect event
        s_rep.connect("tcp://127.0.0.1:6666")
        m = recv_monitor_message(s_event)
        if m['event'] == zmq.EVENT_CONNECT_DELAYED:
            self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6666")
            # test receive event for connected event
            m = recv_monitor_message(s_event)
        self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
        self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6666")

        # test monitor can be disabled.
        s_rep.disable_monitor()
        m = recv_monitor_message(s_event)
        self.assertEqual(m['event'], zmq.EVENT_MONITOR_STOPPED)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号