public void collectMq(){
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(Constants.kfTopic, new Integer(1));
StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
Map<String, List<KafkaStream<String, String>>> consumerMap =
consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
KafkaStream<String, String> stream = consumerMap.get(Constants.kfTopic).get(0);
ConsumerIterator<String, String> it = stream.iterator();
MessageAndMetadata<String, String> msgMeta;
while (it.hasNext()){
msgMeta = it.next();
super.mqTimer.parseMqText(msgMeta.key(), msgMeta.message());
//System.out.println(msgMeta.key()+"\t"+msgMeta.message());
}
}
KafkaMqCollect.java 文件源码
java
阅读 22
收藏 0
点赞 0
评论 0
项目:light_drtc
作者:
评论列表
文章目录