ConsumerWorker.java 文件源码

java
阅读 18 收藏 0 点赞 0 评论 0

项目:yuzhouwan 作者:
@Override
public void run() {
    ConsumerIterator<byte[], byte[]> iter = kafkaStream.iterator();
    MessageAndMetadata<byte[], byte[]> msg;
    int total = 0, fail = 0, success = 0;
    long start = System.currentTimeMillis();
    while (iter.hasNext()) {
        try {
            msg = iter.next();
            _log.info("Thread {}: {}", threadNum, new String(msg.message(), "utf-8"));
            _log.info("partition: {}, offset: {}", msg.partition(), msg.offset());
            success++;
        } catch (Exception e) {
            _log.error("{}", e);
            fail++;
        }
        _log.info("Count [fail/success/total]: [{}/{}/{}], Time: {}s", fail, success, ++total,
                (System.currentTimeMillis() - start) / 1000);
    }
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号