BlockingKafkaMessageConsumerCoordinator.java 文件源码

java
阅读 24 收藏 0 点赞 0 评论 0

项目:benchmarkio 作者:
@Override
public CompletionService<Histogram> startConsumers() {
    final ConsumerConfig consumerConfig = new ConsumerConfig(props);

    consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

    // Create message streams
    final Map<String, Integer> topicMap = new HashMap<>();
    topicMap.put(topic, numThreads);

    final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicMap);
    final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    // Pass each stream to a consumer that will read from the stream in its own thread.
    for (final KafkaStream<byte[], byte[]> stream : streams) {
        executorCompletionService.submit(new BlockingKafkaMessageConsumer(stream));
    }

    return executorCompletionService;
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号