def republish(self, backoff_exc, message, target_queue):
expiration = backoff_exc.next(message, self.exchange.name)
queue = self.make_queue(expiration)
# republish to appropriate backoff queue
amqp_uri = self.container.config[AMQP_URI_CONFIG_KEY]
with get_producer(amqp_uri) as producer:
properties = message.properties.copy()
headers = properties.pop('application_headers')
headers['backoff'] = expiration
expiration_seconds = float(expiration) / 1000
# force redeclaration; the publisher will skip declaration if
# the entity has previously been declared by the same connection
conn = Connection(amqp_uri)
maybe_declare(queue, conn, retry=True, **DEFAULT_RETRY_POLICY)
producer.publish(
message.body,
headers=headers,
exchange=self.exchange,
routing_key=target_queue,
expiration=expiration_seconds,
retry=True,
retry_policy=DEFAULT_RETRY_POLICY,
declare=[queue.exchange, queue],
**properties
)
评论列表
文章目录