ZKOffsetGetter.java 文件源码

java
阅读 51 收藏 0 点赞 0 评论 0

项目:kmanager 作者:
@Override
public Map<String, List<String>> getActiveTopicMap() {
    Map<String, List<String>> topicGroupsMap = new HashMap<String, List<String>>();
    List<String> consumers = ZKUtils.getChildren(ZkUtils.ConsumersPath());
    for (String consumer : consumers) {
        Map<String, scala.collection.immutable.List<ConsumerThreadId>> consumer_consumerThreadId = null;
        try {
            consumer_consumerThreadId = JavaConversions
                    .mapAsJavaMap(ZKUtils.getZKUtilsFromKafka().getConsumersPerTopic(consumer, true));
        } catch (Exception e) {
            LOG.warn("getActiveTopicMap-> getConsumersPerTopic for group: " + consumer + "failed! "
                    + e.getMessage());
            // TODO /consumers/{group}/ids/{id} 节点的内容不符合要求。这个group有问题
            continue;
        }
        Set<String> topics = consumer_consumerThreadId.keySet();
        topics.forEach(topic -> {
            List<String> _groups = null;
            if (topicGroupsMap.containsKey(topic)) {
                _groups = topicGroupsMap.get(topic);
                _groups.add(consumer);
            } else {
                _groups = new ArrayList<String>();
                _groups.add(consumer);
            }
            topicGroupsMap.put(topic, _groups);
        });
    }
    return topicGroupsMap;
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号