gateway.py 文件源码

python
阅读 33 收藏 0 点赞 0 评论 0

项目:Brightside 作者: BrighterCommand 项目源码 文件源码
def receive(self, timeout: int) -> BrightsideMessage:

        self._message = BrightsideMessage(BrightsideMessageHeader(uuid4(), "", BrightsideMessageType.MT_NONE), BrightsideMessageBody(""))

        def _consume(cnx: BrokerConnection, timesup: int) -> None:
            try:
                cnx.drain_events(timeout=timesup)
            except kombu_exceptions.TimeoutError:
                pass
            except(kombu_exceptions.ChannelLimitExceeded,
                   kombu_exceptions.ConnectionLimitExceeded,
                   kombu_exceptions.OperationalError,
                   kombu_exceptions.NotBoundError,
                   kombu_exceptions.MessageStateError,
                   kombu_exceptions.LimitExceeded) as err:
                raise ChannelFailureException("Error connecting to RabbitMQ, see inner exception for details", err)

        def _consume_errors(exc, interval: int)-> None:
            self._logger.error('Draining error: %s, will retry triggering in %s seconds', exc, interval, exc_info=True)

        def _read_message(body: str, msg: KombuMessage) -> None:
            self._logger.debug("Monitoring event received at: %s headers: %s payload: %s", datetime.utcnow().isoformat(), msg.headers, body)
            self._msg = msg
            self._message = self._message_factory.create_message(msg)

        connection = BrokerConnection(hostname=self._amqp_uri)
        with connections[connection].acquire(block=True) as conn:
            self._logger.debug('Got connection: %s', conn.as_uri())
            with Consumer(conn, queues=[self._queue], callbacks=[_read_message]) as consumer:
                consumer.qos(prefetch_count=1)
                ensure_kwargs = self.RETRY_OPTIONS.copy()
                ensure_kwargs['errback'] = _consume_errors
                safe_drain = conn.ensure(consumer, _consume, **ensure_kwargs)
                safe_drain(conn, timeout)

        return self._message
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号