def _handle_commit_error(self, failure, retry_delay, attempt):
""" Retry the commit request, depending on failure type
Depending on the type of the failure, we retry the commit request
with the latest processed offset, or callback/errback self._commit_ds
"""
# Check if we are stopping and the request was cancelled
if self._stopping and failure.check(CancelledError):
# Not really an error
return self._deliver_commit_result(self._last_committed_offset)
# Check that the failure type is a Kafka error...this could maybe be
# a tighter check to determine whether a retry will succeed...
if not failure.check(KafkaError):
log.error("Unhandleable failure during commit attempt: %r\n\t%r",
failure, failure.getBriefTraceback())
return self._deliver_commit_result(failure)
# Do we need to abort?
if (self.request_retry_max_attempts != 0 and
attempt >= self.request_retry_max_attempts):
log.debug("%r: Exhausted attempts: %d to commit offset: %r",
self, self.request_retry_max_attempts, failure)
return self._deliver_commit_result(failure)
# Check the retry_delay to see if we should log at the higher level
# Using attempts % 2 gets us 1-warn/minute with defaults timings
if (retry_delay < self.retry_max_delay or 0 == (attempt % 2)):
log.debug("%r: Failure committing offset to kafka: %r", self,
failure)
else:
# We've retried until we hit the max delay, log alternately at warn
log.warning("%r: Still failing committing offset to kafka: %r",
self, failure)
# Schedule a delayed call to retry the commit
retry_delay = min(retry_delay * REQUEST_RETRY_FACTOR,
self.retry_max_delay)
self._commit_call = self._get_clock().callLater(
retry_delay, self._send_commit_request, retry_delay, attempt + 1)
评论列表
文章目录