def test_tracker(self):
"test the MessageTracker object for tracking when zmq is done with a buffer"
addr = 'tcp://127.0.0.1'
a = self.context.socket(zmq.PUB)
port = a.bind_to_random_port(addr)
a.close()
iface = "%s:%i"%(addr,port)
a = self.context.socket(zmq.PAIR)
# a.setsockopt(zmq.IDENTITY, b"a")
b = self.context.socket(zmq.PAIR)
self.sockets.extend([a,b])
a.connect(iface)
time.sleep(0.1)
p1 = a.send(b'something', copy=False, track=True)
self.assertTrue(isinstance(p1, zmq.MessageTracker))
self.assertFalse(p1.done)
p2 = a.send_multipart([b'something', b'else'], copy=False, track=True)
self.assert_(isinstance(p2, zmq.MessageTracker))
self.assertEqual(p2.done, False)
self.assertEqual(p1.done, False)
b.bind(iface)
msg = b.recv_multipart()
for i in range(10):
if p1.done:
break
time.sleep(0.1)
self.assertEqual(p1.done, True)
self.assertEqual(msg, [b'something'])
msg = b.recv_multipart()
for i in range(10):
if p2.done:
break
time.sleep(0.1)
self.assertEqual(p2.done, True)
self.assertEqual(msg, [b'something', b'else'])
m = zmq.Frame(b"again", track=True)
self.assertEqual(m.tracker.done, False)
p1 = a.send(m, copy=False)
p2 = a.send(m, copy=False)
self.assertEqual(m.tracker.done, False)
self.assertEqual(p1.done, False)
self.assertEqual(p2.done, False)
msg = b.recv_multipart()
self.assertEqual(m.tracker.done, False)
self.assertEqual(msg, [b'again'])
msg = b.recv_multipart()
self.assertEqual(m.tracker.done, False)
self.assertEqual(msg, [b'again'])
self.assertEqual(p1.done, False)
self.assertEqual(p2.done, False)
pm = m.tracker
del m
for i in range(10):
if p1.done:
break
time.sleep(0.1)
self.assertEqual(p1.done, True)
self.assertEqual(p2.done, True)
m = zmq.Frame(b'something', track=False)
self.assertRaises(ValueError, a.send, m, copy=False, track=True)
评论列表
文章目录