def receive(self, timeout: int) -> BrightsideMessage:
self._message = BrightsideMessage(BrightsideMessageHeader(uuid4(), "", BrightsideMessageType.MT_NONE), BrightsideMessageBody(""))
def _consume(cnx: BrokerConnection, timesup: int) -> None:
try:
cnx.drain_events(timeout=timesup)
except kombu_exceptions.TimeoutError:
pass
except(kombu_exceptions.ChannelLimitExceeded,
kombu_exceptions.ConnectionLimitExceeded,
kombu_exceptions.OperationalError,
kombu_exceptions.NotBoundError,
kombu_exceptions.MessageStateError,
kombu_exceptions.LimitExceeded) as err:
raise ChannelFailureException("Error connecting to RabbitMQ, see inner exception for details", err)
def _consume_errors(exc, interval: int)-> None:
self._logger.error('Draining error: %s, will retry triggering in %s seconds', exc, interval, exc_info=True)
def _read_message(body: str, msg: KombuMessage) -> None:
self._logger.debug("Monitoring event received at: %s headers: %s payload: %s", datetime.utcnow().isoformat(), msg.headers, body)
self._msg = msg
self._message = self._message_factory.create_message(msg)
connection = BrokerConnection(hostname=self._amqp_uri)
with connections[connection].acquire(block=True) as conn:
self._logger.debug('Got connection: %s', conn.as_uri())
with Consumer(conn, queues=[self._queue], callbacks=[_read_message]) as consumer:
consumer.qos(prefetch_count=1)
ensure_kwargs = self.RETRY_OPTIONS.copy()
ensure_kwargs['errback'] = _consume_errors
safe_drain = conn.ensure(consumer, _consume, **ensure_kwargs)
safe_drain(conn, timeout)
return self._message
评论列表
文章目录