@Bean
protected KafkaStream<String, float[]> kafkaStream() {
final String topicName = retrieveTopicNameFromGatewayAddress(gatewayUrl());
ConsumerConnector consumerConnector =
Consumer.createJavaConsumerConnector(consumerConfig());
Map<String, Integer> topicCounts = new HashMap<>();
topicCounts.put(topicName, 1);
VerifiableProperties emptyProps = new VerifiableProperties();
StringDecoder keyDecoder = new StringDecoder(emptyProps);
FeatureVectorDecoder valueDecoder = new FeatureVectorDecoder();
Map<String, List<KafkaStream<String, float[]>>> streams =
consumerConnector.createMessageStreams(topicCounts, keyDecoder, valueDecoder);
List<KafkaStream<String, float[]>> streamsByTopic = streams.get(topicName);
Preconditions.checkNotNull(streamsByTopic, String.format("Topic %s not found in streams map.", topicName));
Preconditions.checkElementIndex(0, streamsByTopic.size(),
String.format("List of streams of topic %s is empty.", topicName));
return streamsByTopic.get(0);
}
KafkaConfiguration.java 文件源码
java
阅读 24
收藏 0
点赞 0
评论 0
项目:space-shuttle-demo
作者:
评论列表
文章目录