def test_recv_during_send(self):
sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ)
eventlet.sleep()
done = eventlet.Event()
try:
SNDHWM = zmq.SNDHWM
except AttributeError:
# ZeroMQ <3.0
SNDHWM = zmq.HWM
sender.setsockopt(SNDHWM, 10)
sender.setsockopt(zmq.SNDBUF, 10)
receiver.setsockopt(zmq.RCVBUF, 10)
def tx():
tx_i = 0
while tx_i <= 1000:
sender.send(str(tx_i).encode())
tx_i += 1
done.send(0)
eventlet.spawn(tx)
final_i = done.wait()
self.assertEqual(final_i, 0)
评论列表
文章目录