/**
* Create a Kafka consumer.
*/
@Override
public void open() {
// these consumers use ZooKeeper for commit, offset and segment consumption tracking
// TODO: consider using SimpleConsumer the same way the Hadoop consumer job does to avoid ZK dependency
// TODO: use the task details from TopologyContext in the normal open method
ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties);
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
// consumer with just one thread since the real parallelism is handled by Storm already
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaMessageStream>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
KafkaMessageStream stream = consumerMap.get(topic).get(0);
consumerIterator = stream.iterator();
}
KafkaSpout.java 文件源码
java
阅读 21
收藏 0
点赞 0
评论 0
项目:CadalWorkspace
作者:
评论列表
文章目录