SpoutKafka.java 文件源码

java
阅读 23 收藏 0 点赞 0 评论 0

项目:CadalWorkspace 作者:
@SuppressWarnings("unchecked")
@Override
public void open(Map conf, TopologyContext context,
        SpoutOutputCollector collector) {
    this._collector = collector;
    this.logger = Logger.getLogger(BoltCassandra.class.getClass().getName());

    // Construct kafka part
    Properties props = new Properties();
    props.put("zk.connect", "10.15.62.75:2181");
    props.put("groupid", "sec-group-1");        // 

    ConsumerConfig consumerConfig = new ConsumerConfig(props);
    ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put("sec-stream-one", new Integer(1));        // 

    Map<String, List<KafkaMessageStream>> consumerMap = consumer.createMessageStreams(topicCountMap);

    KafkaMessageStream stream = consumerMap.get("sec-stream-one").get(0);       //

    this.it = stream.iterator();
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号