def test_recv_timeout():
# https://github.com/eventlet/eventlet/issues/282
with clean_pair(zmq.PUB, zmq.SUB) as (_, sub, _):
sub.setsockopt(zmq.RCVTIMEO, 100)
try:
with eventlet.Timeout(1, False):
sub.recv()
assert False
except zmq.ZMQError as e:
assert eventlet.is_timeout(e)
评论列表
文章目录