Kafka.java 文件源码

java
阅读 21 收藏 0 点赞 0 评论 0

项目:jlogstash-input-plugin 作者:
public void reconnConsumer(String topicName){

    //停止topic 对应的conn
    ConsumerConnector consumerConn = consumerConnMap.get(topicName);
    consumerConn.commitOffsets(true);
    consumerConn.shutdown();
    consumerConnMap.remove(topicName);

    //停止topic 对应的stream消耗线程
    ExecutorService es = executorMap.get(topicName);
    es.shutdownNow();   
    executorMap.remove(topicName);

    Properties prop = geneConsumerProp();
    ConsumerConnector newConsumerConn = kafka.consumer.Consumer
            .createJavaConsumerConnector(new ConsumerConfig(prop));
    consumerConnMap.put(topicName, newConsumerConn);

    addNewConsumer(topicName, topic.get(topicName));
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号