def _shovel_to_buffer(self, from_queue):
"""
poor man's alternative to the shovel plugin
:param from_queue: shovel messages from which queue? entity.Queue object
"""
logger = logging.getLogger(self.__class__.__name__)
logger.info("shovelling all messages from error queue to buffer queue")
channel = self._conn.channel()
# prep a consumer for the from_queue only
queue_consumer = Consumer(channel=channel,
queues=[from_queue],
callbacks=[self._shoveller])
queue_consumer.consume()
# finally drain all the work items from error-queue into shoveller
while True:
try:
self._conn.drain_events(timeout=1)
except socket.timeout:
logger.debug("No more work-items in {q}".format(q=from_queue.name))
break
except socket.error as e:
# we don't care about EAGAIN, since we had intentionally started a non-blocking conn
if e.errno == 35:
msg = "{q} is empty".format(q=from_queue.name)
logger.debug(msg)
break
# disconnect
queue_consumer.cancel()
评论列表
文章目录