KafkaConsumerRunnable.java 文件源码

java
阅读 24 收藏 0 点赞 0 评论 0

项目:watchtower-workflow 作者:
public void run() {
  logger.info("KafkaChannel {} has stream", this.threadNumber);

  final ConsumerIterator<byte[], byte[]> streamIterator = stream.iterator();

  running = true;

  while (running) {
    try {
      if (streamIterator.hasNext()) {
        MessageAndMetadata<byte[], byte[]> messageAndMetadata = streamIterator.next();

        byte[] key = messageAndMetadata.key();
        byte[] message = messageAndMetadata.message();

        consumeMessage(key, message);
      }
    } catch (ConsumerTimeoutException cte) {
      logger.debug("Timed out when consuming from Kafka", cte);

      KafkaHealthCheck.getInstance().heartAttack(cte.getMessage());
    }
  }
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号