KafkaMqCollect.java 文件源码

java
阅读 22 收藏 0 点赞 0 评论 0

项目:light_drtc 作者:
public void collectMq(){
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
       topicCountMap.put(Constants.kfTopic, new Integer(1));

       StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
       StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

       Map<String, List<KafkaStream<String, String>>> consumerMap =
               consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);

       KafkaStream<String, String> stream = consumerMap.get(Constants.kfTopic).get(0);
       ConsumerIterator<String, String> it = stream.iterator();
       MessageAndMetadata<String, String> msgMeta;
       while (it.hasNext()){
        msgMeta = it.next();
        super.mqTimer.parseMqText(msgMeta.key(), msgMeta.message());
        //System.out.println(msgMeta.key()+"\t"+msgMeta.message());
       }
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号