@SuppressWarnings("unchecked")
public void prepare() {
Properties props = geneConsumerProp();
for(String topicName : topic.keySet()){
ConsumerConnector consumer = kafka.consumer.Consumer
.createJavaConsumerConnector(new ConsumerConfig(props));
consumerConnMap.put(topicName, consumer);
}
if(distributed!=null){
try {
logger.warn("zkDistributed is start...");
zkDistributed = ZkDistributed.getSingleZkDistributed(distributed);
zkDistributed.zkRegistration();
} catch (Exception e) {
// TODO Auto-generated catch block
logger.error("zkRegistration fail:{}",ExceptionUtil.getErrorMessage(e));
}
}
}
KafkaDistributed.java 文件源码
java
阅读 23
收藏 0
点赞 0
评论 0
项目:jlogstash-input-plugin
作者:
评论列表
文章目录