public void run() {
logger.debug("KafkaChannel {} has stream", this.threadNumber);
final ConsumerIterator<byte[], byte[]> streamIterator = stream.iterator();
running = true;
while (running) {
try {
if (streamIterator.hasNext()) {
final byte[] message = streamIterator.next().message();
logger.debug("Thread {}: {}", threadNumber, message.toString());
consumeMessage(message);
}
} catch (ConsumerTimeoutException cte) {
logger.debug("Timed out when consuming from Kafka", cte);
KafkaHealthCheck.getInstance().heartAttack(cte.getMessage());
}
}
}
KafkaConsumerRunnable.java 文件源码
java
阅读 23
收藏 0
点赞 0
评论 0
项目:watchtower-automation
作者:
评论列表
文章目录