/**
* Modified example from kafka site with some defensive checks added.
*/
private ConsumerIterator<String, String> getStreamIterator() {
Map<String, Integer> topicCountMap = ImmutableMap.of(topic, TOPIC_COUNT);
Map<String, List<KafkaStream<String, String>>> consumerMap =
consumer.createMessageStreams(topicCountMap, keyDecoder, msgDecoder);
List<KafkaStream<String, String>> streams = consumerMap.get(topic);
Preconditions.checkNotNull(streams, "There is no topic named : " + topic);
//copy in case of live list returned. Needed for index check below.
ImmutableList<KafkaStream<String, String>> streamsCopy = ImmutableList.copyOf(streams);
Preconditions.checkElementIndex(FIRST_ELEMENT_INDEX, streamsCopy.size(),
"Failed to find any KafkaStreams related to topic : " + topic);
KafkaStream<String, String> stream = streamsCopy.get(FIRST_ELEMENT_INDEX);
Preconditions.checkNotNull(stream, "Returned kafka stream is null");
ConsumerIterator<String, String> iterator = stream.iterator();
Preconditions.checkNotNull(iterator, "Returned kafka iterator is null");
return iterator;
}
KafkaRequestIdQueue.java 文件源码
java
阅读 24
收藏 0
点赞 0
评论 0
项目:data-acquisition
作者:
评论列表
文章目录