java类kafka.consumer.Blacklist的实例源码

MessageReader.java 文件源码 项目:secor 阅读 25 收藏 0 点赞 0 评论 0
public MessageReader(SecorConfig config, OffsetTracker offsetTracker) throws
        UnknownHostException {
    mConfig = config;
    mOffsetTracker = offsetTracker;

    mConsumerConnector = Consumer.createJavaConsumerConnector(createConsumerConfig());

    if (!mConfig.getKafkaTopicBlacklist().isEmpty() && !mConfig.getKafkaTopicFilter().isEmpty()) {
        throw new RuntimeException("Topic filter and blacklist cannot be both specified.");
    }
    TopicFilter topicFilter = !mConfig.getKafkaTopicBlacklist().isEmpty()? new Blacklist(mConfig.getKafkaTopicBlacklist()):
            new Whitelist(mConfig.getKafkaTopicFilter());
    LOG.debug("Use TopicFilter {}({})", topicFilter.getClass(), topicFilter);
    List<KafkaStream<byte[], byte[]>> streams =
        mConsumerConnector.createMessageStreamsByFilter(topicFilter);
    KafkaStream<byte[], byte[]> stream = streams.get(0);
    mIterator = stream.iterator();
    mLastAccessTime = new HashMap<TopicPartition, Long>();
    StatsUtil.setLabel("secor.kafka.consumer.id", IdUtil.getConsumerId());
    mTopicPartitionForgetSeconds = mConfig.getTopicPartitionForgetSeconds();
    mCheckMessagesPerSecond = mConfig.getMessagesPerSecond() / mConfig.getConsumerThreads();
    mKafkaMessageTimestampFactory = new KafkaMessageTimestampFactory(mConfig.getKafkaMessageTimestampClass());
}
TopicCount.java 文件源码 项目:druid-kafka-ext 阅读 23 收藏 0 点赞 0 评论 0
public String pattern() {
    if (topicFilter instanceof Whitelist)
        return whiteListPattern;
    else if (topicFilter instanceof Blacklist)
        return blackListPattern;
    else
        throw new KafkaZKException("Invalid topicFilter.");
}
Topics.java 文件源码 项目:debezium-proto 阅读 28 收藏 0 点赞 0 评论 0
public static TopicFilter noneOf( String...topics) {
    StringJoiner joiner = new StringJoiner(",");
    for ( String topic : topics ) {
        joiner.add(topic);
    }
    return new Blacklist(joiner.toString());
}
TopicCount.java 文件源码 项目:druid-kafka-ext 阅读 22 收藏 0 点赞 0 评论 0
@SuppressWarnings("unchecked")
public static TopicCount constructTopicCount(ZKConnector<?> zkClient, String group,
        String consumerId) {
    KafkaZKData.ZKGroupDirs dirs = new KafkaZKData.ZKGroupDirs(group);
    String subscriptionPattern = null;
    Map<String, Integer> topMap = null;
    try {
        String topicCountString = zkClient.readData(dirs.consumerRegistryDir() + "/" + consumerId);
        ObjectMapper mapper = new ObjectMapper();
        TypeReference<Map<String, Object>> typeMap = new TypeReference<Map<String, Object>>() {
        };
        Map<String, Object> jsonObj = mapper.reader(typeMap).readValue(
                topicCountString);
        if (jsonObj == null)
            throw new KafkaZKException("error constructing TopicCount : "
                    + topicCountString);
        Object pattern = jsonObj.get("pattern");
        if (pattern == null)
            throw new KafkaZKException("error constructing TopicCount : "
                    + topicCountString);
        subscriptionPattern = (String) pattern;
        Object sub = jsonObj.get("subscription");
        if (sub == null)
            throw new KafkaZKException("error constructing TopicCount : "
                    + topicCountString);
        topMap = (Map<String, Integer>) sub;

    } catch (Throwable t) {
        throw new KafkaZKException(t);
    }

    boolean hasWhiteList = whiteListPattern.equals(subscriptionPattern);
    boolean hasBlackList = blackListPattern.equals(subscriptionPattern);

    if (topMap.isEmpty() || !(hasWhiteList || hasBlackList)) {
        return new StaticTopicCount(consumerId, topMap);
    } else {
        String regex = null;
        Integer numStreams = -1;
        for (Entry<String, Integer> entity : topMap.entrySet()) {
            regex = entity.getKey();
            numStreams = entity.getValue();
            break;
        }
        TopicFilter filter = hasWhiteList ? new Whitelist(regex)
                : new Blacklist(regex);

        return new WildcardTopicCount(zkClient, consumerId, filter,
                numStreams);
    }

}


问题


面经


文章

微信
公众号

扫码关注公众号