def _create_fanout_exchange(self, Consumer, channel):
"""
Creates a fanout queue to accept notifications.
"""
# declare_kwargs = dict(
# exchange='{}_fanout'.format(self.exchange)
# )
# if PIKA_VERSION >= StrictVersion('0.10.0'):
# kwarg = 'exchange_type'
# else:
# kwarg = 'type'
# declare_kwargs[kwarg] = 'fanout'
exchange_name = '{}_fanout'.format(self.exchange)
log.debug('Declaring fanout exchange %s', exchange_name)
exchange = kombu.Exchange(
name=exchange_name,
channel=channel,
durable=False,
type='fanout'
)
exchange.declare()
queue_name = 'fanout_callback_{}'.format(uuid.uuid4())
log.debug('Declaring fanout queue %s', queue_name)
queue = kombu.Queue(
name=queue_name,
exchange=exchange,
exclusive=True,
durable=False,
channel=channel
)
queue.declare()
consumer = Consumer(
# self.connection,
queues=[queue],
on_message=self._on_broadcast,
no_ack=True
# no_ack=True
)
# consumer.consume(no_ack=True)
# channel.exchange_declare(**declare_kwargs)
# fanout_queue = channel.queue_declare(exclusive=True)
# channel.queue_bind(exchange='{}_fanout'.format(self.exchange), queue=fanout_queue.method.queue)
# channel.basic_consume(self._on_broadcast, queue=fanout_queue.method.queue, no_ack=True)
return consumer
评论列表
文章目录