def setup_entities(self):
"""
declare all required entities
no advanced error handling yet (like error on declaration with altered properties etc)
"""
# return if already inited
if self._service_inited:
return
# setup exchange
self._booking_exchange = Exchange(self._config.get_mq_config(constants.EXCHANGE),
type='topic',
durable=True)
# setup durable queues
self.work_queue = Queue(self._config.get_mq_config(constants.WORK_QUEUE),
exchange=self._booking_exchange,
routing_key=constants.WORK_QUEUE + ".#",
durable=True)
self.retry_queue = Queue(self._config.get_mq_config(constants.RETRY_QUEUE),
exchange=self._booking_exchange,
routing_key=constants.RETRY_QUEUE + ".#",
durable=True)
self.dlq_queue = Queue(self._config.get_mq_config(constants.DEAD_LETTER_QUEUE),
exchange=self._booking_exchange,
routing_key=constants.DEAD_LETTER_QUEUE + ".#",
durable=True)
# a buffer queue is needed by error-queue-consumer to temp-buffer msgs for processing
# this is to handle retry loop which may cause between retry-queue and work-queue.
# todo: Need to implement an alternive as this has a copy overhead
# which can be significant when the error queue is large
self.buffer_queue = Queue(name=self._config.get_mq_config(constants.BUFFER_QUEUE),
exchange=self._booking_exchange,
routing_key='buffer.#',
durable=True)
# todo: do we need to make confirm_publish configurable?
self._conn = Connection(self.get_config().rabbitmq_url,
transport_options={'confirm_publish': True})
# declare all the exchanges and queues needed (declare, not overwrite existing)
for entity in [self._booking_exchange, self.work_queue, self.retry_queue,
self.dlq_queue, self.buffer_queue]:
entity.maybe_bind(self._conn)
entity.declare()
# setup producer to push to error and dlqs
self._producer = Producer(channel=self._conn.channel(),
exchange=self._booking_exchange)
self._service_inited = True
评论列表
文章目录