KafkaHelper.java 文件源码

java
阅读 21 收藏 0 点赞 0 评论 0

项目:easyframe-msg 作者:
public static ConsumerConnector getConsumer(String groupId) {
    //加上线程名字的考虑是:保证每个线程只有一个Consumer,但是每个线程又可以有一个独立的Consumer,从而消费不同的partition
    String consumerKey = groupId + "|" + Thread.currentThread().getName();
    ConsumerConnector msgConnector = groupConsumers.get(consumerKey);
    if (msgConnector == null) {
        try {
            consumerLock.lock();
            msgConnector = groupConsumers.get(consumerKey);
            if (msgConnector == null) {
                msgConnector = Consumer.createJavaConsumerConnector(getConsumerRealConfig(groupId));
                groupConsumers.put(consumerKey, msgConnector);
            }
        } finally {
            consumerLock.unlock();
        }
    }

    return msgConnector;
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号