public void recv() {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<String, String>>> streamMap = consumer.createMessageStreams(topicMap, new StringDecoder(null), new StringDecoder(null));
KafkaStream<String, String> stream = streamMap.get(topic).get(0);
ConsumerIterator<String, String> it = stream.iterator();
while (it.hasNext()) {
MessageAndMetadata<String, String> mm = it.next();
System.out.println("<<< Got new message");
System.out.println("<<< key:" + mm.key());
System.out.println("<<< m: " + mm.message());
}
}
CollectorTest.java 文件源码
java
阅读 27
收藏 0
点赞 0
评论 0
项目:cloudinsight-platform-docker
作者:
评论列表
文章目录