def _configure_retry_exchanges(self, connection):
def declare_queues():
channel = connection.channel()
almanach_exchange = Exchange(name=config.rabbitmq_retry_return_exchange(),
type='direct',
channel=channel)
retry_exchange = Exchange(name=config.rabbitmq_retry_exchange(),
type='direct',
channel=channel)
retry_queue = Queue(name=config.rabbitmq_retry_queue(),
exchange=retry_exchange,
routing_key=config.rabbitmq_routing_key(),
queue_arguments=self._get_queue_arguments(),
channel=channel)
almanach_queue = Queue(name=config.rabbitmq_queue(),
exchange=almanach_exchange,
durable=False,
routing_key=config.rabbitmq_routing_key(),
channel=channel)
retry_queue.declare()
almanach_queue.declare()
return retry_exchange
def error_callback(exception, interval):
logging.error('Failed to declare queues and exchanges, retrying in %d seconds. %r' % (interval, exception))
declare_queues = connection.ensure(connection, declare_queues, errback=error_callback,
interval_start=0, interval_step=5, interval_max=30)
return declare_queues()
评论列表
文章目录