python类ROUTER的实例源码

monitoredqueue.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def monitored_queue(in_socket, out_socket, mon_socket,
                    in_prefix=b'in', out_prefix=b'out'):

    swap_ids = in_socket.type == zmq.ROUTER and out_socket.type == zmq.ROUTER

    poller = zmq.Poller()
    poller.register(in_socket, zmq.POLLIN)
    poller.register(out_socket, zmq.POLLIN)
    while True:
        events = dict(poller.poll())
        if in_socket in events:
            _relay(in_socket, out_socket, mon_socket, in_prefix, swap_ids)
        if out_socket in events:
            _relay(out_socket, in_socket, mon_socket, out_prefix, swap_ids)
test_context.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_term_hang(self):
        rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
        req.setsockopt(zmq.LINGER, 0)
        req.send(b'hello', copy=False)
        req.close()
        rep.close()
        self.context.term()
test_monqueue.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_default_mq_args(self):
        self.device = dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB)
        dev.setsockopt_in(zmq.LINGER, 0)
        dev.setsockopt_out(zmq.LINGER, 0)
        dev.setsockopt_mon(zmq.LINGER, 0)
        # this will raise if default args are wrong
        dev.start()
        self.teardown_device()
test_monqueue.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_mq_check_prefix(self):
        ins = self.context.socket(zmq.ROUTER)
        outs = self.context.socket(zmq.DEALER)
        mons = self.context.socket(zmq.PUB)
        self.sockets.extend([ins, outs, mons])

        ins = unicode('in')
        outs = unicode('out')
        self.assertRaises(TypeError, devices.monitoredqueue, ins, outs, mons)
monitoredqueue.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def monitored_queue(in_socket, out_socket, mon_socket,
                    in_prefix=b'in', out_prefix=b'out'):

    swap_ids = in_socket.type == zmq.ROUTER and out_socket.type == zmq.ROUTER

    poller = zmq.Poller()
    poller.register(in_socket, zmq.POLLIN)
    poller.register(out_socket, zmq.POLLIN)
    while True:
        events = dict(poller.poll())
        if in_socket in events:
            _relay(in_socket, out_socket, mon_socket, in_prefix, swap_ids)
        if out_socket in events:
            _relay(out_socket, in_socket, mon_socket, out_prefix, swap_ids)
test_context.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_term_hang(self):
        rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
        req.setsockopt(zmq.LINGER, 0)
        req.send(b'hello', copy=False)
        req.close()
        rep.close()
        self.context.term()
test_multipart.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_router_dealer(self):
        router, dealer = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)

        msg1 = b'message1'
        dealer.send(msg1)
        ident = self.recv(router)
        more = router.rcvmore
        self.assertEqual(more, True)
        msg2 = self.recv(router)
        self.assertEqual(msg1, msg2)
        more = router.rcvmore
        self.assertEqual(more, False)
test_monqueue.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_default_mq_args(self):
        self.device = dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB)
        dev.setsockopt_in(zmq.LINGER, 0)
        dev.setsockopt_out(zmq.LINGER, 0)
        dev.setsockopt_mon(zmq.LINGER, 0)
        # this will raise if default args are wrong
        dev.start()
        self.teardown_device()
test_monqueue.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_mq_check_prefix(self):
        ins = self.context.socket(zmq.ROUTER)
        outs = self.context.socket(zmq.DEALER)
        mons = self.context.socket(zmq.PUB)
        self.sockets.extend([ins, outs, mons])

        ins = unicode('in')
        outs = unicode('out')
        self.assertRaises(TypeError, devices.monitoredqueue, ins, outs, mons)
test_context.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_term_hang(self):
        rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
        req.setsockopt(zmq.LINGER, 0)
        req.send(b'hello', copy=False)
        req.close()
        rep.close()
        self.context.term()
test_multipart.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_router_dealer(self):
        router, dealer = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)

        msg1 = b'message1'
        dealer.send(msg1)
        ident = self.recv(router)
        more = router.rcvmore
        self.assertEqual(more, True)
        msg2 = self.recv(router)
        self.assertEqual(msg1, msg2)
        more = router.rcvmore
        self.assertEqual(more, False)
test_monqueue.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_mq_check_prefix(self):
        ins = self.context.socket(zmq.ROUTER)
        outs = self.context.socket(zmq.DEALER)
        mons = self.context.socket(zmq.PUB)
        self.sockets.extend([ins, outs, mons])

        ins = unicode('in')
        outs = unicode('out')
        self.assertRaises(TypeError, devices.monitoredqueue, ins, outs, mons)
monitoredqueue.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def monitored_queue(in_socket, out_socket, mon_socket,
                    in_prefix=b'in', out_prefix=b'out'):

    swap_ids = in_socket.type == zmq.ROUTER and out_socket.type == zmq.ROUTER

    poller = zmq.Poller()
    poller.register(in_socket, zmq.POLLIN)
    poller.register(out_socket, zmq.POLLIN)
    while True:
        events = dict(poller.poll())
        if in_socket in events:
            _relay(in_socket, out_socket, mon_socket, in_prefix, swap_ids)
        if out_socket in events:
            _relay(out_socket, in_socket, mon_socket, out_prefix, swap_ids)
test_context.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_term_hang(self):
        rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
        req.setsockopt(zmq.LINGER, 0)
        req.send(b'hello', copy=False)
        req.close()
        rep.close()
        self.context.term()
test_monqueue.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_default_mq_args(self):
        self.device = dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB)
        dev.setsockopt_in(zmq.LINGER, 0)
        dev.setsockopt_out(zmq.LINGER, 0)
        dev.setsockopt_mon(zmq.LINGER, 0)
        # this will raise if default args are wrong
        dev.start()
        self.teardown_device()
test_monqueue.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_mq_check_prefix(self):
        ins = self.context.socket(zmq.ROUTER)
        outs = self.context.socket(zmq.DEALER)
        mons = self.context.socket(zmq.PUB)
        self.sockets.extend([ins, outs, mons])

        ins = unicode('in')
        outs = unicode('out')
        self.assertRaises(TypeError, devices.monitoredqueue, ins, outs, mons)
monitoredqueue.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def monitored_queue(in_socket, out_socket, mon_socket,
                    in_prefix=b'in', out_prefix=b'out'):

    swap_ids = in_socket.type == zmq.ROUTER and out_socket.type == zmq.ROUTER

    poller = zmq.Poller()
    poller.register(in_socket, zmq.POLLIN)
    poller.register(out_socket, zmq.POLLIN)
    while True:
        events = dict(poller.poll())
        if in_socket in events:
            _relay(in_socket, out_socket, mon_socket, in_prefix, swap_ids)
        if out_socket in events:
            _relay(out_socket, in_socket, mon_socket, out_prefix, swap_ids)
test_context.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_term_hang(self):
        rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
        req.setsockopt(zmq.LINGER, 0)
        req.send(b'hello', copy=False)
        req.close()
        rep.close()
        self.context.term()
test_multipart.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_router_dealer(self):
        router, dealer = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)

        msg1 = b'message1'
        dealer.send(msg1)
        ident = self.recv(router)
        more = router.rcvmore
        self.assertEqual(more, True)
        msg2 = self.recv(router)
        self.assertEqual(msg1, msg2)
        more = router.rcvmore
        self.assertEqual(more, False)
test_monqueue.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_default_mq_args(self):
        self.device = dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB)
        dev.setsockopt_in(zmq.LINGER, 0)
        dev.setsockopt_out(zmq.LINGER, 0)
        dev.setsockopt_mon(zmq.LINGER, 0)
        # this will raise if default args are wrong
        dev.start()
        self.teardown_device()


问题


面经


文章

微信
公众号

扫码关注公众号