private void KafkaInit(){
Properties props = new Properties();
props.put("zookeeper.connect", "10.15.62.75:2181,10.15.62.76:2181,10.15.62.77:2181");
props.put("group.id", "SearchTerm");
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("Search-query", new Integer(1));// 第二个参数是指用几个流,多个流是为了并行处理。
Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get("Search-query").get(0);// 这里只有一个流,所以得get(0)就可以了。
this.it = stream.iterator();
}
SearchTermSpout.java 文件源码
java
阅读 15
收藏 0
点赞 0
评论 0
项目:CadalWorkspace
作者:
评论列表
文章目录