@Override
public Map<String, List<String>> getActiveTopicMap() {
Map<String, List<String>> topicGroupsMap = new HashMap<String, List<String>>();
List<String> consumers = ZKUtils.getChildren(ZkUtils.ConsumersPath());
for (String consumer : consumers) {
Map<String, scala.collection.immutable.List<ConsumerThreadId>> consumer_consumerThreadId = null;
try {
consumer_consumerThreadId = JavaConversions
.mapAsJavaMap(ZKUtils.getZKUtilsFromKafka().getConsumersPerTopic(consumer, true));
} catch (Exception e) {
LOG.warn("getActiveTopicMap-> getConsumersPerTopic for group: " + consumer + "failed! "
+ e.getMessage());
// TODO /consumers/{group}/ids/{id} 节点的内容不符合要求。这个group有问题
continue;
}
Set<String> topics = consumer_consumerThreadId.keySet();
topics.forEach(topic -> {
List<String> _groups = null;
if (topicGroupsMap.containsKey(topic)) {
_groups = topicGroupsMap.get(topic);
_groups.add(consumer);
} else {
_groups = new ArrayList<String>();
_groups.add(consumer);
}
topicGroupsMap.put(topic, _groups);
});
}
return topicGroupsMap;
}
java类kafka.consumer.ConsumerThreadId的实例源码
ZKOffsetGetter.java 文件源码
项目:kmanager
阅读 49
收藏 0
点赞 0
评论 0
ConsumerRebalanceListener.java 文件源码
项目:kafka-0.11.0.0-src-with-comment
阅读 22
收藏 0
点赞 0
评论 0
/**
* This method is called after the new partition assignment is finished but before fetcher
* threads start. A map of new global partition assignment is passed in as parameter.
* @param consumerId The consumer Id string of the consumer invoking this callback.
* @param globalPartitionAssignment A Map[topic, Map[Partition, ConsumerThreadId]]. It is the global partition
* assignment of this consumer group.
*/
public void beforeStartingFetchers(String consumerId, Map<String, Map<Integer, ConsumerThreadId>> globalPartitionAssignment);