retry_queue_consumer.py 文件源码

python
阅读 15 收藏 0 点赞 0 评论 0

项目:EasyJobLite 作者: treebohotels 项目源码 文件源码
def _shovel_to_buffer(self, from_queue):
        """
        poor man's alternative to the shovel plugin

        :param from_queue: shovel messages from which queue? entity.Queue object
        """
        logger = logging.getLogger(self.__class__.__name__)
        logger.info("shovelling all messages from error queue to buffer queue")

        channel = self._conn.channel()

        # prep a consumer for the from_queue only
        queue_consumer = Consumer(channel=channel,
                                  queues=[from_queue],
                                  callbacks=[self._shoveller])
        queue_consumer.consume()

        # finally drain all the work items from error-queue into shoveller
        while True:
            try:
                self._conn.drain_events(timeout=1)

            except socket.timeout:
                logger.debug("No more work-items in {q}".format(q=from_queue.name))
                break

            except socket.error as e:
                # we don't care about EAGAIN, since we had intentionally started a non-blocking conn
                if e.errno == 35:
                    msg = "{q} is empty".format(q=from_queue.name)
                    logger.debug(msg)
                    break

        # disconnect
        queue_consumer.cancel()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号