def get_consumers(self, consumer, channel):
queue = kombu.Queue(config.rabbitmq_queue(), routing_key=config.rabbitmq_routing_key())
return [consumer(
[queue],
callbacks=[self.on_message],
auto_declare=False)]
python类Queue()的实例源码
def _configure_retry_exchanges(self, connection):
def declare_queues():
channel = connection.channel()
almanach_exchange = Exchange(name=config.rabbitmq_retry_return_exchange(),
type='direct',
channel=channel)
retry_exchange = Exchange(name=config.rabbitmq_retry_exchange(),
type='direct',
channel=channel)
retry_queue = Queue(name=config.rabbitmq_retry_queue(),
exchange=retry_exchange,
routing_key=config.rabbitmq_routing_key(),
queue_arguments=self._get_queue_arguments(),
channel=channel)
almanach_queue = Queue(name=config.rabbitmq_queue(),
exchange=almanach_exchange,
durable=False,
routing_key=config.rabbitmq_routing_key(),
channel=channel)
retry_queue.declare()
almanach_queue.declare()
return retry_exchange
def error_callback(exception, interval):
logging.error('Failed to declare queues and exchanges, retrying in %d seconds. %r' % (interval, exception))
declare_queues = connection.ensure(connection, declare_queues, errback=error_callback,
interval_start=0, interval_step=5, interval_max=30)
return declare_queues()
def __init__(self, broker_url, queue_name, fetch_count=10):
"""??????
Args:
broker_url (string): broker??
queue_name (string): ???????
fetch_count (int): ???????
"""
self.queue_name = queue_name
self.broker_url = broker_url
self.fetch_count = fetch_count
self.connection = Connection(broker_url)
self.queue = Queue(queue_name)
def _queue(self, conn=None):
exchange = kombu.Exchange(self.channel, type='fanout', durable=False)
queue = kombu.Queue(str(uuid.uuid4()), exchange)
return queue
def setup(self):
super(ReplyConsumer, self).setup()
config = self.container.config
"""Declare consumer queue for this service in current region"""
self.queue = Queue(
exchange=orders_exchange,
routing_key='{}_{}'.format(
config['REGION'],
ROUTING_KEY_CALCULATE_TAXES
),
name='fed.{}_{}'.format(
config['REGION'], ROUTING_KEY_CALCULATE_TAXES
)
)
"""Bind federated queues in all regions to
`orders` exchange with correct routing key.
"""
with get_connection(config[AMQP_URI_CONFIG_KEY]) as connection:
maybe_declare(orders_exchange, connection)
self._bind_queues_in_for_all_regions(
ROUTING_KEY_CALCULATE_TAXES, connection
)
self._bind_queues_in_for_all_regions(
ROUTING_KEY_CALCULATE_TAXES_REPLY, connection
)
def _bind_queues_in_for_all_regions(self, routing_key, connection):
for region in REGIONS:
maybe_declare(Queue(
exchange=orders_exchange,
routing_key='{}_{}'.format(
region, routing_key
),
name='fed.{}_{}'.format(
region, routing_key
)
), connection)
def setup(self):
reply_queue_name = "{}_{}".format(
self.container.config['REGION'], ROUTING_KEY_CALCULATE_TAXES_REPLY
)
queue = Queue(
exchange=orders_exchange,
routing_key=reply_queue_name,
name='fed.{}'.format(reply_queue_name)
)
self.queue = queue
super(DynamicConsumer, self).setup()
def get_consumers(self, consumer, channel):
api_event_queue = Queue(
"zstack.ui.api.event.%s" % self.uuid,
exchange=self.broadcast_exchange,
routing_key="key.event.API.API_EVENT",
auto_delete=True)
canonical_event_queue = Queue(
"zstack.ui.canonical.event.%s" % self.uuid,
exchange=self.broadcast_exchange,
routing_key="key.event.LOCAL.canonicalEvent",
auto_delete=True)
# self.new_channel = channel.connection.channel()
reply_queue_name = "zstack.ui.message.%s" % self.uuid
reply_queue = Queue(
reply_queue_name,
# exchange=self.p2p_exchange,
# routing_key="zstack.message.cloudbus.#",
[binding(self.p2p_exchange, "zstack.message.vmInstance.#"),
binding(self.p2p_exchange, "zstack.message.ecs.vm.#"),
binding(self.p2p_exchange, "zstack.message.aliyun.sdk.#")
],
auto_delete=True)
return [
consumer(
queues=[canonical_event_queue],
callbacks=[self.on_canonical_event]),
consumer(
queues=[api_event_queue],
callbacks=[self.on_api_event]),
consumer(
queues=[reply_queue],
callbacks=[self.on_message])
]
def get_consumers(self, consumer, channel):
exchange = Exchange(exchange_name, type="topic", durable=True)
queue = Queue(queue_name, exchange, routing_key=routing_key, durable=True)
return [consumer(queue, callbacks=[self.on_message])]
def create_partitioned_queues(name):
exchange = Exchange(name, type='direct')
for num in range(1):
CELERY_QUEUES.append(Queue(
'{0}-{1}'.format(name, num),
exchange=exchange,
))
def _create_fanout_exchange(self, Consumer, channel):
"""
Creates a fanout queue to accept notifications.
"""
# declare_kwargs = dict(
# exchange='{}_fanout'.format(self.exchange)
# )
# if PIKA_VERSION >= StrictVersion('0.10.0'):
# kwarg = 'exchange_type'
# else:
# kwarg = 'type'
# declare_kwargs[kwarg] = 'fanout'
exchange_name = '{}_fanout'.format(self.exchange)
log.debug('Declaring fanout exchange %s', exchange_name)
exchange = kombu.Exchange(
name=exchange_name,
channel=channel,
durable=False,
type='fanout'
)
exchange.declare()
queue_name = 'fanout_callback_{}'.format(uuid.uuid4())
log.debug('Declaring fanout queue %s', queue_name)
queue = kombu.Queue(
name=queue_name,
exchange=exchange,
exclusive=True,
durable=False,
channel=channel
)
queue.declare()
consumer = Consumer(
# self.connection,
queues=[queue],
on_message=self._on_broadcast,
no_ack=True
# no_ack=True
)
# consumer.consume(no_ack=True)
# channel.exchange_declare(**declare_kwargs)
# fanout_queue = channel.queue_declare(exclusive=True)
# channel.queue_bind(exchange='{}_fanout'.format(self.exchange), queue=fanout_queue.method.queue)
# channel.basic_consume(self._on_broadcast, queue=fanout_queue.method.queue, no_ack=True)
return consumer
def __init__(
self, exchange_name, connection_url, qkwargs, config_file=None):
"""
Initializes a new Service instance.
:param exchange_name: Name of the topic exchange.
:type exchange_name: str
:param connection_url: Kombu connection url.
:type connection_url: str
:param qkwargs: One or more dicts keyword arguments for queue creation
:type qkwargs: list
:param config_file: Path to the configuration file location.
:type config_file: str or None
"""
name = self.__class__.__name__
self.logger = logging.getLogger(name)
self.logger.debug('Initializing {}'.format(name))
# If we are given no default, use the global one
# Read the configuration file
self._config_data = read_config_file(
config_file, self._default_config_file)
if connection_url is None and 'bus_uri' in self._config_data:
connection_url = self._config_data.get('bus_uri')
self.logger.debug(
'Using connection_url=%s from config file', connection_url)
if exchange_name is None and 'exchange_name' in self._config_data:
self.logger.debug(
'Using exchange_name=%s from config file', exchange_name)
exchange_name = self._config_data.get('bus_exchange')
self.connection = Connection(connection_url)
self._channel = self.connection.default_channel
self._exchange = Exchange(
exchange_name, type='topic').bind(self._channel)
self._exchange.declare()
# Set up queues
self._queues = []
for kwargs in qkwargs:
queue = Queue(**kwargs)
queue.exchange = self._exchange
queue = queue.bind(self._channel)
self._queues.append(queue)
self.logger.debug(queue.as_dict())
# Create producer for publishing on topics
self.producer = Producer(self._channel, self._exchange)
self.logger.debug('Initializing of {} finished'.format(name))
def setup_entities(self):
"""
declare all required entities
no advanced error handling yet (like error on declaration with altered properties etc)
"""
# return if already inited
if self._service_inited:
return
# setup exchange
self._booking_exchange = Exchange(self._config.get_mq_config(constants.EXCHANGE),
type='topic',
durable=True)
# setup durable queues
self.work_queue = Queue(self._config.get_mq_config(constants.WORK_QUEUE),
exchange=self._booking_exchange,
routing_key=constants.WORK_QUEUE + ".#",
durable=True)
self.retry_queue = Queue(self._config.get_mq_config(constants.RETRY_QUEUE),
exchange=self._booking_exchange,
routing_key=constants.RETRY_QUEUE + ".#",
durable=True)
self.dlq_queue = Queue(self._config.get_mq_config(constants.DEAD_LETTER_QUEUE),
exchange=self._booking_exchange,
routing_key=constants.DEAD_LETTER_QUEUE + ".#",
durable=True)
# a buffer queue is needed by error-queue-consumer to temp-buffer msgs for processing
# this is to handle retry loop which may cause between retry-queue and work-queue.
# todo: Need to implement an alternive as this has a copy overhead
# which can be significant when the error queue is large
self.buffer_queue = Queue(name=self._config.get_mq_config(constants.BUFFER_QUEUE),
exchange=self._booking_exchange,
routing_key='buffer.#',
durable=True)
# todo: do we need to make confirm_publish configurable?
self._conn = Connection(self.get_config().rabbitmq_url,
transport_options={'confirm_publish': True})
# declare all the exchanges and queues needed (declare, not overwrite existing)
for entity in [self._booking_exchange, self.work_queue, self.retry_queue,
self.dlq_queue, self.buffer_queue]:
entity.maybe_bind(self._conn)
entity.declare()
# setup producer to push to error and dlqs
self._producer = Producer(channel=self._conn.channel(),
exchange=self._booking_exchange)
self._service_inited = True