orchestrator.py 文件源码

python
阅读 23 收藏 0 点赞 0 评论 0

项目:EasyJobLite 作者: treebohotels 项目源码 文件源码
def setup_entities(self):
        """
        declare all required entities
        no advanced error handling yet (like error on declaration with altered properties etc)
        """
        # return if already inited
        if self._service_inited:
            return

        # setup exchange
        self._booking_exchange = Exchange(self._config.get_mq_config(constants.EXCHANGE),
                                          type='topic',
                                          durable=True)

        # setup durable queues
        self.work_queue = Queue(self._config.get_mq_config(constants.WORK_QUEUE),
                                exchange=self._booking_exchange,
                                routing_key=constants.WORK_QUEUE + ".#",
                                durable=True)

        self.retry_queue = Queue(self._config.get_mq_config(constants.RETRY_QUEUE),
                                 exchange=self._booking_exchange,
                                 routing_key=constants.RETRY_QUEUE + ".#",
                                 durable=True)

        self.dlq_queue = Queue(self._config.get_mq_config(constants.DEAD_LETTER_QUEUE),
                               exchange=self._booking_exchange,
                               routing_key=constants.DEAD_LETTER_QUEUE + ".#",
                               durable=True)

        # a buffer queue is needed by error-queue-consumer to temp-buffer msgs for processing
        # this is to handle retry loop which may cause between retry-queue and work-queue.
        # todo: Need to implement an alternive as this has a copy overhead
        #  which can be significant when the error queue is large
        self.buffer_queue = Queue(name=self._config.get_mq_config(constants.BUFFER_QUEUE),
                                  exchange=self._booking_exchange,
                                  routing_key='buffer.#',
                                  durable=True)

        # todo: do we need to make confirm_publish configurable?
        self._conn = Connection(self.get_config().rabbitmq_url,
                                transport_options={'confirm_publish': True})

        # declare all the exchanges and queues needed (declare, not overwrite existing)
        for entity in [self._booking_exchange, self.work_queue, self.retry_queue,
                       self.dlq_queue, self.buffer_queue]:
            entity.maybe_bind(self._conn)
            entity.declare()

        # setup producer to push to error and dlqs
        self._producer = Producer(channel=self._conn.channel(),
                                  exchange=self._booking_exchange)

        self._service_inited = True
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号