public void run() {
try {
while(true){
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext()) {
String m = null;
try {
m = new String(it.next().message(),
this.kafkaInput.encoding);
Map<String, Object> event = this.decoder
.decode(m);
if(zkDistributed==null){
this.kafkaInput.process(event);
}else{
zkDistributed.route(event);
}
} catch (Exception e) {
logger.error("process event:{} failed:{}",m,ExceptionUtil.getErrorMessage(e));
}
}
}
} catch (Exception t) {
logger.error("kakfa Consumer fetch is error:{}",ExceptionUtil.getErrorMessage(t));
}
}
KafkaDistributed.java 文件源码
java
阅读 26
收藏 0
点赞 0
评论 0
项目:jlogstash-input-plugin
作者:
评论列表
文章目录