def test_change_subscription(self):
# FIXME: Extensive testing showed this particular test is the root cause
# of sporadic failures on Travis.
pub, sub, port = self.create_bound_pair(zmq.PUB, zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, b'test')
eventlet.sleep(0)
sub_ready = eventlet.Event()
sub_last = eventlet.Event()
sub_done = eventlet.Event()
def rx():
while sub.recv() != b'test BEGIN':
eventlet.sleep(0)
sub_ready.send()
count = 0
while True:
msg = sub.recv()
if msg == b'test BEGIN':
# BEGIN may come many times
continue
if msg == b'test LAST':
sub.setsockopt(zmq.SUBSCRIBE, b'done')
sub.setsockopt(zmq.UNSUBSCRIBE, b'test')
eventlet.sleep(0)
# In real application you should either sync
# or tolerate loss of messages.
sub_last.send()
if msg == b'done DONE':
break
count += 1
sub_done.send(count)
def tx():
# Sync receiver ready to avoid loss of first packets
while not sub_ready.ready():
pub.send(b'test BEGIN')
eventlet.sleep(0.005)
for i in range(1, 101):
msg = 'test {0}'.format(i).encode()
if i != 50:
pub.send(msg)
else:
pub.send(b'test LAST')
sub_last.wait()
# XXX: putting a real delay of 1ms here fixes sporadic failures on Travis
# just yield eventlet.sleep(0) doesn't cut it
eventlet.sleep(0.001)
pub.send(b'done DONE')
eventlet.spawn(rx)
eventlet.spawn(tx)
rx_count = sub_done.wait()
self.assertEqual(rx_count, 50)
评论列表
文章目录