public static ConsumerConnector getConsumer(String groupId) {
//加上线程名字的考虑是:保证每个线程只有一个Consumer,但是每个线程又可以有一个独立的Consumer,从而消费不同的partition
String consumerKey = groupId + "|" + Thread.currentThread().getName();
ConsumerConnector msgConnector = groupConsumers.get(consumerKey);
if (msgConnector == null) {
try {
consumerLock.lock();
msgConnector = groupConsumers.get(consumerKey);
if (msgConnector == null) {
msgConnector = Consumer.createJavaConsumerConnector(getConsumerRealConfig(groupId));
groupConsumers.put(consumerKey, msgConnector);
}
} finally {
consumerLock.unlock();
}
}
return msgConnector;
}
KafkaHelper.java 文件源码
java
阅读 21
收藏 0
点赞 0
评论 0
项目:easyframe-msg
作者:
评论列表
文章目录