java类kafka.consumer.ConsumerThreadId的实例源码

ZKOffsetGetter.java 文件源码 项目:kmanager 阅读 49 收藏 0 点赞 0 评论 0
@Override
public Map<String, List<String>> getActiveTopicMap() {
    Map<String, List<String>> topicGroupsMap = new HashMap<String, List<String>>();
    List<String> consumers = ZKUtils.getChildren(ZkUtils.ConsumersPath());
    for (String consumer : consumers) {
        Map<String, scala.collection.immutable.List<ConsumerThreadId>> consumer_consumerThreadId = null;
        try {
            consumer_consumerThreadId = JavaConversions
                    .mapAsJavaMap(ZKUtils.getZKUtilsFromKafka().getConsumersPerTopic(consumer, true));
        } catch (Exception e) {
            LOG.warn("getActiveTopicMap-> getConsumersPerTopic for group: " + consumer + "failed! "
                    + e.getMessage());
            // TODO /consumers/{group}/ids/{id} 节点的内容不符合要求。这个group有问题
            continue;
        }
        Set<String> topics = consumer_consumerThreadId.keySet();
        topics.forEach(topic -> {
            List<String> _groups = null;
            if (topicGroupsMap.containsKey(topic)) {
                _groups = topicGroupsMap.get(topic);
                _groups.add(consumer);
            } else {
                _groups = new ArrayList<String>();
                _groups.add(consumer);
            }
            topicGroupsMap.put(topic, _groups);
        });
    }
    return topicGroupsMap;
}
ConsumerRebalanceListener.java 文件源码 项目:kafka-0.11.0.0-src-with-comment 阅读 22 收藏 0 点赞 0 评论 0
/**
 * This method is called after the new partition assignment is finished but before fetcher
 * threads start. A map of new global partition assignment is passed in as parameter.
 * @param consumerId The consumer Id string of the consumer invoking this callback.
 * @param globalPartitionAssignment A Map[topic, Map[Partition, ConsumerThreadId]]. It is the global partition
 *                                  assignment of this consumer group.
 */
public void beforeStartingFetchers(String consumerId, Map<String, Map<Integer, ConsumerThreadId>> globalPartitionAssignment);


问题


面经


文章

微信
公众号

扫码关注公众号