def test_send_1k_push_pull(self):
down, up, port = self.create_bound_pair(zmq.PUSH, zmq.PULL)
eventlet.sleep()
done = eventlet.Event()
def tx():
tx_i = 0
while tx_i <= 1000:
tx_i += 1
down.send(str(tx_i).encode())
def rx():
while True:
rx_i = up.recv()
if rx_i == b"1000":
done.send(0)
break
eventlet.spawn(tx)
eventlet.spawn(rx)
final_i = done.wait()
self.assertEqual(final_i, 0)
评论列表
文章目录