CollectorTest.java 文件源码

java
阅读 27 收藏 0 点赞 0 评论 0

项目:cloudinsight-platform-docker 作者:
public void recv() {
    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());

    Map<String, Integer> topicMap = new HashMap<String, Integer>();
    topicMap.put(topic, new Integer(1));
    Map<String, List<KafkaStream<String, String>>> streamMap = consumer.createMessageStreams(topicMap, new StringDecoder(null), new StringDecoder(null));

    KafkaStream<String, String> stream = streamMap.get(topic).get(0);

    ConsumerIterator<String, String> it = stream.iterator();
    while (it.hasNext()) {
        MessageAndMetadata<String, String> mm = it.next();
        System.out.println("<<< Got new message");
        System.out.println("<<< key:" + mm.key());
        System.out.println("<<< m: " + mm.message());

    }
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号