@SuppressWarnings("unchecked")
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this._collector = collector;
this.logger = Logger.getLogger(BoltCassandra.class.getClass().getName());
// Construct kafka part
Properties props = new Properties();
props.put("zk.connect", "10.15.62.75:2181");
props.put("groupid", "sec-group-1"); //
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("sec-stream-one", new Integer(1)); //
Map<String, List<KafkaMessageStream>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaMessageStream stream = consumerMap.get("sec-stream-one").get(0); //
this.it = stream.iterator();
}
SpoutKafka.java 文件源码
java
阅读 23
收藏 0
点赞 0
评论 0
项目:CadalWorkspace
作者:
评论列表
文章目录