KafkaRequestIdQueue.java 文件源码

java
阅读 24 收藏 0 点赞 0 评论 0

项目:data-acquisition 作者:
/**
 * Modified example from kafka site with some defensive checks added.
 */
private ConsumerIterator<String, String> getStreamIterator() {
    Map<String, Integer> topicCountMap = ImmutableMap.of(topic, TOPIC_COUNT);
    Map<String, List<KafkaStream<String, String>>> consumerMap =
            consumer.createMessageStreams(topicCountMap, keyDecoder, msgDecoder);
    List<KafkaStream<String, String>> streams = consumerMap.get(topic);
    Preconditions.checkNotNull(streams, "There is no topic named : " + topic);
    //copy in case of live list returned. Needed for index check below.
    ImmutableList<KafkaStream<String, String>> streamsCopy = ImmutableList.copyOf(streams);

    Preconditions.checkElementIndex(FIRST_ELEMENT_INDEX, streamsCopy.size(),
            "Failed to find any KafkaStreams related to topic : " + topic);
    KafkaStream<String, String> stream = streamsCopy.get(FIRST_ELEMENT_INDEX);

    Preconditions.checkNotNull(stream, "Returned kafka stream is null");

    ConsumerIterator<String, String> iterator = stream.iterator();
    Preconditions.checkNotNull(iterator, "Returned kafka iterator is null");
    return iterator;
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号