KafkaSpout.java 文件源码

java
阅读 20 收藏 0 点赞 0 评论 0

项目:CadalWorkspace 作者:
/**
 * Create a Kafka consumer.
 */
@Override
public void open() {

    // these consumers use ZooKeeper for commit, offset and segment consumption tracking
    // TODO: consider using SimpleConsumer the same way the Hadoop consumer job does to avoid ZK dependency
    // TODO: use the task details from TopologyContext in the normal open method
    ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties);
    consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

    // consumer with just one thread since the real parallelism is handled by Storm already
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(1));

    Map<String, List<KafkaMessageStream>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
    KafkaMessageStream stream = consumerMap.get(topic).get(0);

    consumerIterator = stream.iterator();
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号