backoff.py 文件源码

python
阅读 52 收藏 0 点赞 0 评论 0

项目:nameko-amqp-retry 作者: nameko 项目源码 文件源码
def republish(self, backoff_exc, message, target_queue):

        expiration = backoff_exc.next(message, self.exchange.name)
        queue = self.make_queue(expiration)

        # republish to appropriate backoff queue
        amqp_uri = self.container.config[AMQP_URI_CONFIG_KEY]
        with get_producer(amqp_uri) as producer:

            properties = message.properties.copy()
            headers = properties.pop('application_headers')

            headers['backoff'] = expiration
            expiration_seconds = float(expiration) / 1000

            # force redeclaration; the publisher will skip declaration if
            # the entity has previously been declared by the same connection
            conn = Connection(amqp_uri)
            maybe_declare(queue, conn, retry=True, **DEFAULT_RETRY_POLICY)

            producer.publish(
                message.body,
                headers=headers,
                exchange=self.exchange,
                routing_key=target_queue,
                expiration=expiration_seconds,
                retry=True,
                retry_policy=DEFAULT_RETRY_POLICY,
                declare=[queue.exchange, queue],
                **properties
            )
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号