consumer.py 文件源码

python
阅读 24 收藏 0 点赞 0 评论 0

项目:afkak 作者: ciena 项目源码 文件源码
def _handle_commit_error(self, failure, retry_delay, attempt):
        """ Retry the commit request, depending on failure type

        Depending on the type of the failure, we retry the commit request
        with the latest processed offset, or callback/errback self._commit_ds
        """
        # Check if we are stopping and the request was cancelled
        if self._stopping and failure.check(CancelledError):
            # Not really an error
            return self._deliver_commit_result(self._last_committed_offset)

        # Check that the failure type is a Kafka error...this could maybe be
        # a tighter check to determine whether a retry will succeed...
        if not failure.check(KafkaError):
            log.error("Unhandleable failure during commit attempt: %r\n\t%r",
                      failure, failure.getBriefTraceback())
            return self._deliver_commit_result(failure)

        # Do we need to abort?
        if (self.request_retry_max_attempts != 0 and
                attempt >= self.request_retry_max_attempts):
            log.debug("%r: Exhausted attempts: %d to commit offset: %r",
                      self, self.request_retry_max_attempts, failure)
            return self._deliver_commit_result(failure)

        # Check the retry_delay to see if we should log at the higher level
        # Using attempts % 2 gets us 1-warn/minute with defaults timings
        if (retry_delay < self.retry_max_delay or 0 == (attempt % 2)):
            log.debug("%r: Failure committing offset to kafka: %r", self,
                      failure)
        else:
            # We've retried until we hit the max delay, log alternately at warn
            log.warning("%r: Still failing committing offset to kafka: %r",
                        self, failure)

        # Schedule a delayed call to retry the commit
        retry_delay = min(retry_delay * REQUEST_RETRY_FACTOR,
                          self.retry_max_delay)
        self._commit_call = self._get_clock().callLater(
            retry_delay, self._send_commit_request, retry_delay, attempt + 1)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号