def _receive_message(self):
"""
Internal coroutine for receiving messages
"""
while True:
try:
if self._socket.getsockopt(zmq.TYPE) == zmq.ROUTER:
zmq_identity, msg_bytes = \
yield from self._socket.recv_multipart()
self._received_from_identity(zmq_identity)
self._dispatcher_queue.put_nowait(
(zmq_identity, msg_bytes))
else:
msg_bytes = yield from self._socket.recv()
self._last_message_time = time.time()
self._dispatcher_queue.put_nowait((None, msg_bytes))
except CancelledError:
# The concurrent.futures.CancelledError is caught by asyncio
# when the Task associated with the coroutine is cancelled.
# The raise is required to stop this component.
raise
except Exception as e: # pylint: disable=broad-except
LOGGER.exception("Received a message on address %s that "
"caused an error: %s", self._address, e)
评论列表
文章目录