def drain_events(self, infinity=True):
with nested(*self.consumers):
while not self._stop:
try:
self.connection.drain_events(timeout=self.timeout)
except socket.timeout:
if not infinity:
break
except Exception as e:
if not self.connection.connected and not self._stop:
self.logger.error(
'Connection to mq has broken off. Try to reconnect')
self.connect()
self.revive()
break
elif not self._stop and not isinstance(e, HandlerError):
self.logger.exception(
'Something wrong! Try to restart the loop')
self.revive()
break
elif isinstance(e, HandlerError):
pass
else: # pragma: no cover
self.logger.exception('Unknown error') # pragma: no cover
if self._stop:
self._stopped = True
self.logger.info('Stopped draining events.')
评论列表
文章目录