@Override
public CompletionService<Histogram> startConsumers() {
final ConsumerConfig consumerConfig = new ConsumerConfig(props);
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
// Create message streams
final Map<String, Integer> topicMap = new HashMap<>();
topicMap.put(topic, numThreads);
final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicMap);
final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// Pass each stream to a consumer that will read from the stream in its own thread.
for (final KafkaStream<byte[], byte[]> stream : streams) {
executorCompletionService.submit(new BlockingKafkaMessageConsumer(stream));
}
return executorCompletionService;
}
BlockingKafkaMessageConsumerCoordinator.java 文件源码
java
阅读 24
收藏 0
点赞 0
评论 0
项目:benchmarkio
作者:
评论列表
文章目录