def purge(self, timeout: int = 5) -> None:
def _purge_errors(exc, interval):
self._logger.error('Purging error: %s, will retry triggering in %s seconds', exc, interval, exc_info=True)
def _purge_messages(cnsmr: BrightsideConsumer):
cnsmr.purge()
self._message = None
connection = BrokerConnection(hostname=self._amqp_uri)
with connections[connection].acquire(block=True) as conn:
self._logger.debug('Got connection: %s', conn.as_uri())
with Consumer(conn, queues=[self._queue], callbacks=[_purge_messages]) as consumer:
ensure_kwargs = self.RETRY_OPTIONS.copy()
ensure_kwargs['errback'] = _purge_errors
safe_purge = conn.ensure(consumer, _purge_messages, **ensure_kwargs)
safe_purge(consumer)
评论列表
文章目录