/**
* To avoid too many try-catches this is separate..
*/
public void runrun() {
log.info("Waiting to consume data");
ConsumerIterator<byte[], byte[]> it = stream.iterator();
//loop through all messages in the stream
while (it.hasNext()) {
byte[] msg = it.next().message();
if (msg.length < 2) {
//valid messages are longer than 2 bytes as the first one is schema id
//once upon time some libraries (pypro) would start with a short message to try if the kafka topic was alive. this is what topic polling refers to.
log.info("ignoring short msg, assuming topic polling");
continue;
}
// log.trace("Thread " + id + ":: " + Arrays.toString(msg));
process(msg);
}
log.info("Shutting down consumer Thread: " + id);
}
InFluxAvroConsumer.java 文件源码
java
阅读 23
收藏 0
点赞 0
评论 0
项目:kafka-consumer
作者:
评论列表
文章目录