KafkaConsumerRunnable.java 文件源码

java
阅读 23 收藏 0 点赞 0 评论 0

项目:watchtower-automation 作者:
public void run() {
  logger.debug("KafkaChannel {} has stream", this.threadNumber);
  final ConsumerIterator<byte[], byte[]> streamIterator = stream.iterator();

  running = true;

  while (running) {
    try {
      if (streamIterator.hasNext()) {
        final byte[] message = streamIterator.next().message();

        logger.debug("Thread {}: {}", threadNumber, message.toString());

        consumeMessage(message);
      }
    } catch (ConsumerTimeoutException cte) {
      logger.debug("Timed out when consuming from Kafka", cte);

      KafkaHealthCheck.getInstance().heartAttack(cte.getMessage());
    }
  }
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号