@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
logger.info("Opened");
this.collector = collector;
logger.info(" topic = " + kafkaSpoutConfig.kafkaConsumerConfiguration.getTopic());
this.spoutName = String.format("%s-%d", context.getThisComponentId(), context.getThisTaskId());
Properties kafkaProperties =
KafkaConsumerProperties.createKafkaProperties(kafkaSpoutConfig.kafkaConsumerConfiguration);
// Have to use a different consumer.id for each spout so use the storm taskId. Otherwise,
// zookeeper complains about a conflicted ephemeral node when there is more than one spout
// reading from a topic
kafkaProperties.setProperty("consumer.id", String.valueOf(context.getThisTaskId()));
ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties);
this.consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
}
KafkaSpout.java 文件源码
java
阅读 20
收藏 0
点赞 0
评论 0
项目:monasca-thresh
作者:
评论列表
文章目录