KafkaConsumerTestBase.java 文件源码

java
阅读 17 收藏 0 点赞 0 评论 0

项目:flink 作者:
/**
 * Read topic to list, only using Kafka code.
 */
private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) {
    ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config);
    // we request only one stream per consumer instance. Kafka will make sure that each consumer group
    // will see each message only once.
    Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
    Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap);
    if (streams.size() != 1) {
        throw new RuntimeException("Expected only one message stream but got "+streams.size());
    }
    List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
    if (kafkaStreams == null) {
        throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
    }
    if (kafkaStreams.size() != 1) {
        throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams");
    }
    LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId());
    ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator();

    List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<>();
    int read = 0;
    while(iteratorToRead.hasNext()) {
        read++;
        result.add(iteratorToRead.next());
        if (read == stopAfter) {
            LOG.info("Read "+read+" elements");
            return result;
        }
    }
    return result;
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号