@Override
public Map<String, List<String>> getActiveTopicMap() {
Map<String, List<String>> topicGroupsMap = new HashMap<String, List<String>>();
List<String> consumers = ZKUtils.getChildren(ZkUtils.ConsumersPath());
for (String consumer : consumers) {
Map<String, scala.collection.immutable.List<ConsumerThreadId>> consumer_consumerThreadId = null;
try {
consumer_consumerThreadId = JavaConversions
.mapAsJavaMap(ZKUtils.getZKUtilsFromKafka().getConsumersPerTopic(consumer, true));
} catch (Exception e) {
LOG.warn("getActiveTopicMap-> getConsumersPerTopic for group: " + consumer + "failed! "
+ e.getMessage());
// TODO /consumers/{group}/ids/{id} 节点的内容不符合要求。这个group有问题
continue;
}
Set<String> topics = consumer_consumerThreadId.keySet();
topics.forEach(topic -> {
List<String> _groups = null;
if (topicGroupsMap.containsKey(topic)) {
_groups = topicGroupsMap.get(topic);
_groups.add(consumer);
} else {
_groups = new ArrayList<String>();
_groups.add(consumer);
}
topicGroupsMap.put(topic, _groups);
});
}
return topicGroupsMap;
}
ZKOffsetGetter.java 文件源码
java
阅读 51
收藏 0
点赞 0
评论 0
项目:kmanager
作者:
评论列表
文章目录