KafkaMessageConsumer.java 文件源码

java
阅读 26 收藏 0 点赞 0 评论 0

项目:haogrgr-test 作者:
/**
 * 初始化Kafka消费者客户端, 并获取Topic对应的Stream
 */
private void openKafkaStream() {
    logger.info("开始初始化Kafka消费客户端");

    this.consumer = Consumer.createJavaConsumerConnector(getConsumerConfig());

    StringDecoder decoder = new StringDecoder(null);
    Map<String, Integer> topicCountMap = Maps.of(topic, 1);
    Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,
            decoder, decoder);

    List<KafkaStream<String, String>> streams = consumerMap.get(topic);
    this.stream = streams.get(0);

    Assert.notNull(stream);
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号