def test_recv_spawned_before_send_is_non_blocking(self):
req, rep, port = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
# req.connect(ipc)
# rep.bind(ipc)
eventlet.sleep()
msg = dict(res=None)
done = eventlet.Event()
def rx():
msg['res'] = rep.recv()
done.send('done')
eventlet.spawn(rx)
req.send(b'test')
done.wait()
self.assertEqual(msg['res'], b'test')
评论列表
文章目录