@Override
public void run() {
ConsumerIterator<byte[], byte[]> iter = kafkaStream.iterator();
MessageAndMetadata<byte[], byte[]> msg;
int total = 0, fail = 0, success = 0;
long start = System.currentTimeMillis();
while (iter.hasNext()) {
try {
msg = iter.next();
_log.info("Thread {}: {}", threadNum, new String(msg.message(), "utf-8"));
_log.info("partition: {}, offset: {}", msg.partition(), msg.offset());
success++;
} catch (Exception e) {
_log.error("{}", e);
fail++;
}
_log.info("Count [fail/success/total]: [{}/{}/{}], Time: {}s", fail, success, ++total,
(System.currentTimeMillis() - start) / 1000);
}
}
ConsumerWorker.java 文件源码
java
阅读 18
收藏 0
点赞 0
评论 0
项目:yuzhouwan
作者:
评论列表
文章目录