public MessageReader(SecorConfig config, OffsetTracker offsetTracker) throws
UnknownHostException {
mConfig = config;
mOffsetTracker = offsetTracker;
mConsumerConnector = Consumer.createJavaConsumerConnector(createConsumerConfig());
if (!mConfig.getKafkaTopicBlacklist().isEmpty() && !mConfig.getKafkaTopicFilter().isEmpty()) {
throw new RuntimeException("Topic filter and blacklist cannot be both specified.");
}
TopicFilter topicFilter = !mConfig.getKafkaTopicBlacklist().isEmpty()? new Blacklist(mConfig.getKafkaTopicBlacklist()):
new Whitelist(mConfig.getKafkaTopicFilter());
LOG.debug("Use TopicFilter {}({})", topicFilter.getClass(), topicFilter);
List<KafkaStream<byte[], byte[]>> streams =
mConsumerConnector.createMessageStreamsByFilter(topicFilter);
KafkaStream<byte[], byte[]> stream = streams.get(0);
mIterator = stream.iterator();
mLastAccessTime = new HashMap<TopicPartition, Long>();
StatsUtil.setLabel("secor.kafka.consumer.id", IdUtil.getConsumerId());
mTopicPartitionForgetSeconds = mConfig.getTopicPartitionForgetSeconds();
mCheckMessagesPerSecond = mConfig.getMessagesPerSecond() / mConfig.getConsumerThreads();
mKafkaMessageTimestampFactory = new KafkaMessageTimestampFactory(mConfig.getKafkaMessageTimestampClass());
}
java类kafka.consumer.Blacklist的实例源码
MessageReader.java 文件源码
项目:secor
阅读 25
收藏 0
点赞 0
评论 0
TopicCount.java 文件源码
项目:druid-kafka-ext
阅读 23
收藏 0
点赞 0
评论 0
public String pattern() {
if (topicFilter instanceof Whitelist)
return whiteListPattern;
else if (topicFilter instanceof Blacklist)
return blackListPattern;
else
throw new KafkaZKException("Invalid topicFilter.");
}
Topics.java 文件源码
项目:debezium-proto
阅读 28
收藏 0
点赞 0
评论 0
public static TopicFilter noneOf( String...topics) {
StringJoiner joiner = new StringJoiner(",");
for ( String topic : topics ) {
joiner.add(topic);
}
return new Blacklist(joiner.toString());
}
TopicCount.java 文件源码
项目:druid-kafka-ext
阅读 22
收藏 0
点赞 0
评论 0
@SuppressWarnings("unchecked")
public static TopicCount constructTopicCount(ZKConnector<?> zkClient, String group,
String consumerId) {
KafkaZKData.ZKGroupDirs dirs = new KafkaZKData.ZKGroupDirs(group);
String subscriptionPattern = null;
Map<String, Integer> topMap = null;
try {
String topicCountString = zkClient.readData(dirs.consumerRegistryDir() + "/" + consumerId);
ObjectMapper mapper = new ObjectMapper();
TypeReference<Map<String, Object>> typeMap = new TypeReference<Map<String, Object>>() {
};
Map<String, Object> jsonObj = mapper.reader(typeMap).readValue(
topicCountString);
if (jsonObj == null)
throw new KafkaZKException("error constructing TopicCount : "
+ topicCountString);
Object pattern = jsonObj.get("pattern");
if (pattern == null)
throw new KafkaZKException("error constructing TopicCount : "
+ topicCountString);
subscriptionPattern = (String) pattern;
Object sub = jsonObj.get("subscription");
if (sub == null)
throw new KafkaZKException("error constructing TopicCount : "
+ topicCountString);
topMap = (Map<String, Integer>) sub;
} catch (Throwable t) {
throw new KafkaZKException(t);
}
boolean hasWhiteList = whiteListPattern.equals(subscriptionPattern);
boolean hasBlackList = blackListPattern.equals(subscriptionPattern);
if (topMap.isEmpty() || !(hasWhiteList || hasBlackList)) {
return new StaticTopicCount(consumerId, topMap);
} else {
String regex = null;
Integer numStreams = -1;
for (Entry<String, Integer> entity : topMap.entrySet()) {
regex = entity.getKey();
numStreams = entity.getValue();
break;
}
TopicFilter filter = hasWhiteList ? new Whitelist(regex)
: new Blacklist(regex);
return new WildcardTopicCount(zkClient, consumerId, filter,
numStreams);
}
}