TopicCount.java 文件源码

java
阅读 26 收藏 0 点赞 0 评论 0

项目:druid-kafka-ext 作者:
@SuppressWarnings("unchecked")
public static TopicCount constructTopicCount(ZKConnector<?> zkClient, String group,
        String consumerId) {
    KafkaZKData.ZKGroupDirs dirs = new KafkaZKData.ZKGroupDirs(group);
    String subscriptionPattern = null;
    Map<String, Integer> topMap = null;
    try {
        String topicCountString = zkClient.readData(dirs.consumerRegistryDir() + "/" + consumerId);
        ObjectMapper mapper = new ObjectMapper();
        TypeReference<Map<String, Object>> typeMap = new TypeReference<Map<String, Object>>() {
        };
        Map<String, Object> jsonObj = mapper.reader(typeMap).readValue(
                topicCountString);
        if (jsonObj == null)
            throw new KafkaZKException("error constructing TopicCount : "
                    + topicCountString);
        Object pattern = jsonObj.get("pattern");
        if (pattern == null)
            throw new KafkaZKException("error constructing TopicCount : "
                    + topicCountString);
        subscriptionPattern = (String) pattern;
        Object sub = jsonObj.get("subscription");
        if (sub == null)
            throw new KafkaZKException("error constructing TopicCount : "
                    + topicCountString);
        topMap = (Map<String, Integer>) sub;

    } catch (Throwable t) {
        throw new KafkaZKException(t);
    }

    boolean hasWhiteList = whiteListPattern.equals(subscriptionPattern);
    boolean hasBlackList = blackListPattern.equals(subscriptionPattern);

    if (topMap.isEmpty() || !(hasWhiteList || hasBlackList)) {
        return new StaticTopicCount(consumerId, topMap);
    } else {
        String regex = null;
        Integer numStreams = -1;
        for (Entry<String, Integer> entity : topMap.entrySet()) {
            regex = entity.getKey();
            numStreams = entity.getValue();
            break;
        }
        TopicFilter filter = hasWhiteList ? new Whitelist(regex)
                : new Blacklist(regex);

        return new WildcardTopicCount(zkClient, consumerId, filter,
                numStreams);
    }

}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号