def declare_exchange(self, name, type='direct', queues=None, **options):
"""Create or update exchange
:param name: name of exchange
:type name: str
:param type: type of exchange - direct, fanout, topic, match
:type type: str
:param queues: list of queues with routing keys: [[queue_name, routing_key], [queue_name, routing_key], ...]
:type queues: list, None or tuple
:param options: additional options for Exchange creation
"""
if queues is None:
queues = [] # pragma: no cover
with self.connections[self.connection].acquire() as conn:
exchange = Exchange(name, type=type, channel=conn, **options)
exchange.declare()
self.exchanges[name] = exchange
for q_name, routing_key in queues:
queue = Queue(name=q_name, channel=conn)
queue.declare()
queue.bind_to(exchange=name, routing_key=routing_key)
self.logger.debug('Queue "%s" with routing_key "%s" was bond to exchange "%s"', q_name,
routing_key if routing_key else q_name, name)
评论列表
文章目录