public void reconnConsumer(String topicName){
//停止topic 对应的conn
ConsumerConnector consumerConn = consumerConnMap.get(topicName);
consumerConn.commitOffsets(true);
consumerConn.shutdown();
consumerConnMap.remove(topicName);
//停止topic 对应的stream消耗线程
ExecutorService es = executorMap.get(topicName);
es.shutdownNow();
executorMap.remove(topicName);
Properties prop = geneConsumerProp();
ConsumerConnector newConsumerConn = kafka.consumer.Consumer
.createJavaConsumerConnector(new ConsumerConfig(prop));
consumerConnMap.put(topicName, newConsumerConn);
addNewConsumer(topicName, topic.get(topicName));
}
Kafka.java 文件源码
java
阅读 21
收藏 0
点赞 0
评论 0
项目:jlogstash-input-plugin
作者:
评论列表
文章目录