KafkaDistributed.java 文件源码

java
阅读 27 收藏 0 点赞 0 评论 0

项目:jlogstash-input-plugin 作者:
public void reconnConsumer(String topicName){

        //停止topic 对应的conn
        ConsumerConnector consumerConn = consumerConnMap.get(topicName);
        consumerConn.commitOffsets(true);
        consumerConn.shutdown();
        consumerConnMap.remove(topicName);

        //停止topic 对应的stream消耗线程
        ExecutorService es = executorMap.get(topicName);
        es.shutdownNow();
        executorMap.remove(topicName);

        Properties prop = geneConsumerProp();
        ConsumerConnector newConsumerConn = kafka.consumer.Consumer
                .createJavaConsumerConnector(new ConsumerConfig(prop));
        consumerConnMap.put(topicName, newConsumerConn);

        addNewConsumer(topicName, topic.get(topicName));
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号