/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see Thread#run()
*/
@Override
public void run() {
ConsumerConnector consumerConnector = KafkaUtils.createConsumerConnector(zkAddr, group);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(CONSUMER_OFFSET_TOPIC, new Integer(1));
KafkaStream<byte[], byte[]> offsetMsgStream = consumerConnector.createMessageStreams(topicCountMap).get(CONSUMER_OFFSET_TOPIC).get(0);
ConsumerIterator<byte[], byte[]> it = offsetMsgStream.iterator();
while (true) {
MessageAndMetadata<byte[], byte[]> offsetMsg = it.next();
if (ByteBuffer.wrap(offsetMsg.key()).getShort() < 2) {
try {
GroupTopicPartition commitKey = readMessageKey(ByteBuffer.wrap(offsetMsg.key()));
if (offsetMsg.message() == null) {
continue;
}
kafka.common.OffsetAndMetadata commitValue = readMessageValue(ByteBuffer.wrap(offsetMsg.message()));
kafkaConsumerOffsets.put(commitKey, commitValue);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
KafkaOffsetGetter.java 文件源码
java
阅读 20
收藏 0
点赞 0
评论 0
项目:Kafka-Insight
作者:
评论列表
文章目录