public MessageReader(SecorConfig config, OffsetTracker offsetTracker) throws
UnknownHostException {
mConfig = config;
mOffsetTracker = offsetTracker;
mConsumerConnector = Consumer.createJavaConsumerConnector(createConsumerConfig());
if (!mConfig.getKafkaTopicBlacklist().isEmpty() && !mConfig.getKafkaTopicFilter().isEmpty()) {
throw new RuntimeException("Topic filter and blacklist cannot be both specified.");
}
TopicFilter topicFilter = !mConfig.getKafkaTopicBlacklist().isEmpty()? new Blacklist(mConfig.getKafkaTopicBlacklist()):
new Whitelist(mConfig.getKafkaTopicFilter());
LOG.debug("Use TopicFilter {}({})", topicFilter.getClass(), topicFilter);
List<KafkaStream<byte[], byte[]>> streams =
mConsumerConnector.createMessageStreamsByFilter(topicFilter);
KafkaStream<byte[], byte[]> stream = streams.get(0);
mIterator = stream.iterator();
mLastAccessTime = new HashMap<TopicPartition, Long>();
StatsUtil.setLabel("secor.kafka.consumer.id", IdUtil.getConsumerId());
mTopicPartitionForgetSeconds = mConfig.getTopicPartitionForgetSeconds();
mCheckMessagesPerSecond = mConfig.getMessagesPerSecond() / mConfig.getConsumerThreads();
mKafkaMessageTimestampFactory = new KafkaMessageTimestampFactory(mConfig.getKafkaMessageTimestampClass());
}
MessageReader.java 文件源码
java
阅读 31
收藏 0
点赞 0
评论 0
项目:secor
作者:
评论列表
文章目录