def monitored_queue(in_socket, out_socket, mon_socket,
in_prefix=b'in', out_prefix=b'out'):
swap_ids = in_socket.type == zmq.ROUTER and out_socket.type == zmq.ROUTER
poller = zmq.Poller()
poller.register(in_socket, zmq.POLLIN)
poller.register(out_socket, zmq.POLLIN)
while True:
events = dict(poller.poll())
if in_socket in events:
_relay(in_socket, out_socket, mon_socket, in_prefix, swap_ids)
if out_socket in events:
_relay(out_socket, in_socket, mon_socket, out_prefix, swap_ids)
python类ROUTER的实例源码
def test_term_hang(self):
rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
req.setsockopt(zmq.LINGER, 0)
req.send(b'hello', copy=False)
req.close()
rep.close()
self.context.term()
def test_default_mq_args(self):
self.device = dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB)
dev.setsockopt_in(zmq.LINGER, 0)
dev.setsockopt_out(zmq.LINGER, 0)
dev.setsockopt_mon(zmq.LINGER, 0)
# this will raise if default args are wrong
dev.start()
self.teardown_device()
def test_mq_check_prefix(self):
ins = self.context.socket(zmq.ROUTER)
outs = self.context.socket(zmq.DEALER)
mons = self.context.socket(zmq.PUB)
self.sockets.extend([ins, outs, mons])
ins = unicode('in')
outs = unicode('out')
self.assertRaises(TypeError, devices.monitoredqueue, ins, outs, mons)
def monitored_queue(in_socket, out_socket, mon_socket,
in_prefix=b'in', out_prefix=b'out'):
swap_ids = in_socket.type == zmq.ROUTER and out_socket.type == zmq.ROUTER
poller = zmq.Poller()
poller.register(in_socket, zmq.POLLIN)
poller.register(out_socket, zmq.POLLIN)
while True:
events = dict(poller.poll())
if in_socket in events:
_relay(in_socket, out_socket, mon_socket, in_prefix, swap_ids)
if out_socket in events:
_relay(out_socket, in_socket, mon_socket, out_prefix, swap_ids)
def test_term_hang(self):
rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
req.setsockopt(zmq.LINGER, 0)
req.send(b'hello', copy=False)
req.close()
rep.close()
self.context.term()
def test_router_dealer(self):
router, dealer = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
msg1 = b'message1'
dealer.send(msg1)
ident = self.recv(router)
more = router.rcvmore
self.assertEqual(more, True)
msg2 = self.recv(router)
self.assertEqual(msg1, msg2)
more = router.rcvmore
self.assertEqual(more, False)
def test_default_mq_args(self):
self.device = dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB)
dev.setsockopt_in(zmq.LINGER, 0)
dev.setsockopt_out(zmq.LINGER, 0)
dev.setsockopt_mon(zmq.LINGER, 0)
# this will raise if default args are wrong
dev.start()
self.teardown_device()
def test_mq_check_prefix(self):
ins = self.context.socket(zmq.ROUTER)
outs = self.context.socket(zmq.DEALER)
mons = self.context.socket(zmq.PUB)
self.sockets.extend([ins, outs, mons])
ins = unicode('in')
outs = unicode('out')
self.assertRaises(TypeError, devices.monitoredqueue, ins, outs, mons)
def test_term_hang(self):
rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
req.setsockopt(zmq.LINGER, 0)
req.send(b'hello', copy=False)
req.close()
rep.close()
self.context.term()
def test_router_dealer(self):
router, dealer = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
msg1 = b'message1'
dealer.send(msg1)
ident = self.recv(router)
more = router.rcvmore
self.assertEqual(more, True)
msg2 = self.recv(router)
self.assertEqual(msg1, msg2)
more = router.rcvmore
self.assertEqual(more, False)
def test_mq_check_prefix(self):
ins = self.context.socket(zmq.ROUTER)
outs = self.context.socket(zmq.DEALER)
mons = self.context.socket(zmq.PUB)
self.sockets.extend([ins, outs, mons])
ins = unicode('in')
outs = unicode('out')
self.assertRaises(TypeError, devices.monitoredqueue, ins, outs, mons)
def monitored_queue(in_socket, out_socket, mon_socket,
in_prefix=b'in', out_prefix=b'out'):
swap_ids = in_socket.type == zmq.ROUTER and out_socket.type == zmq.ROUTER
poller = zmq.Poller()
poller.register(in_socket, zmq.POLLIN)
poller.register(out_socket, zmq.POLLIN)
while True:
events = dict(poller.poll())
if in_socket in events:
_relay(in_socket, out_socket, mon_socket, in_prefix, swap_ids)
if out_socket in events:
_relay(out_socket, in_socket, mon_socket, out_prefix, swap_ids)
def test_term_hang(self):
rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
req.setsockopt(zmq.LINGER, 0)
req.send(b'hello', copy=False)
req.close()
rep.close()
self.context.term()
def test_default_mq_args(self):
self.device = dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB)
dev.setsockopt_in(zmq.LINGER, 0)
dev.setsockopt_out(zmq.LINGER, 0)
dev.setsockopt_mon(zmq.LINGER, 0)
# this will raise if default args are wrong
dev.start()
self.teardown_device()
def test_mq_check_prefix(self):
ins = self.context.socket(zmq.ROUTER)
outs = self.context.socket(zmq.DEALER)
mons = self.context.socket(zmq.PUB)
self.sockets.extend([ins, outs, mons])
ins = unicode('in')
outs = unicode('out')
self.assertRaises(TypeError, devices.monitoredqueue, ins, outs, mons)
def monitored_queue(in_socket, out_socket, mon_socket,
in_prefix=b'in', out_prefix=b'out'):
swap_ids = in_socket.type == zmq.ROUTER and out_socket.type == zmq.ROUTER
poller = zmq.Poller()
poller.register(in_socket, zmq.POLLIN)
poller.register(out_socket, zmq.POLLIN)
while True:
events = dict(poller.poll())
if in_socket in events:
_relay(in_socket, out_socket, mon_socket, in_prefix, swap_ids)
if out_socket in events:
_relay(out_socket, in_socket, mon_socket, out_prefix, swap_ids)
def test_term_hang(self):
rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
req.setsockopt(zmq.LINGER, 0)
req.send(b'hello', copy=False)
req.close()
rep.close()
self.context.term()
def test_router_dealer(self):
router, dealer = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
msg1 = b'message1'
dealer.send(msg1)
ident = self.recv(router)
more = router.rcvmore
self.assertEqual(more, True)
msg2 = self.recv(router)
self.assertEqual(msg1, msg2)
more = router.rcvmore
self.assertEqual(more, False)
def test_default_mq_args(self):
self.device = dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB)
dev.setsockopt_in(zmq.LINGER, 0)
dev.setsockopt_out(zmq.LINGER, 0)
dev.setsockopt_mon(zmq.LINGER, 0)
# this will raise if default args are wrong
dev.start()
self.teardown_device()