def _create_service_queues(self, services, Consumer, channel):
"""
Creates necessary AMQP queues, one per service.
"""
log.debug('Declaring exchange %s', self.exchange)
exchange = kombu.Exchange(
self.exchange,
channel=channel,
durable=False
)
exchange.declare()
# channel.exchange_declare(exchange=self.exchange)
queues = []
for service in services.values():
queue_name = '{}_service_{}'.format(self.exchange, service.name)
log.debug('Declaring service queue %s', queue_name)
queue = kombu.Queue(
channel=channel,
name=queue_name,
exchange=exchange,
routing_key=queue_name,
exclusive=False,
durable=False,
# channel=channel
)
queue.declare()
queues.append(queue)
# channel.queue_delete(queue=queue)
# channel.queue_declare(queue=queue, auto_delete=True)
# channel.queue_bind(queue, self.exchange)
# channel.basic_consume(self._on_message, queue=queue, no_ack=False)
consumer = Consumer(
# self.connection,
queues=queues,
on_message=self._on_message,
no_ack=False
)
# consumer.consume(no_ack=False)
return consumer
评论列表
文章目录