def _receive_with_timeout(self, socket, timeout_s, use_multipart=False):
"""Check for socket activity and either return what's
received on the socket or time out if timeout_s expires
without anything on the socket.
This is implemented in loops of self.try_length_ms milliseconds
to allow Ctrl-C handling to take place.
"""
if timeout_s is config.FOREVER:
timeout_ms = config.FOREVER
else:
timeout_ms = int(1000 * timeout_s)
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
ms_so_far = 0
try:
for interval_ms in self.intervals_ms(timeout_ms):
sockets = dict(poller.poll(interval_ms))
ms_so_far += interval_ms
if socket in sockets:
if use_multipart:
return socket.recv_multipart()
else:
return socket.recv()
else:
raise core.SocketTimedOutError(timeout_s)
except KeyboardInterrupt:
raise core.SocketInterruptedError(ms_so_far / 1000.0)
评论列表
文章目录