/**
* 初始化Kafka消费者客户端, 并获取Topic对应的Stream
*/
private void openKafkaStream() {
logger.info("开始初始化Kafka消费客户端");
this.consumer = Consumer.createJavaConsumerConnector(getConsumerConfig());
StringDecoder decoder = new StringDecoder(null);
Map<String, Integer> topicCountMap = Maps.of(topic, 1);
Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,
decoder, decoder);
List<KafkaStream<String, String>> streams = consumerMap.get(topic);
this.stream = streams.get(0);
Assert.notNull(stream);
}
KafkaMessageConsumer.java 文件源码
java
阅读 26
收藏 0
点赞 0
评论 0
项目:haogrgr-test
作者:
评论列表
文章目录