/**
* {@inheritDoc}
*/
@Override
public void initialize()
throws StreamingException
{
ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties);
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = Maps.newHashMap();
topicCountMap.put(topic, TOPIC_COUNT);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumerConnector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
consumerIterator = stream.iterator();
}
KafkaSourceOp.java 文件源码
java
阅读 18
收藏 0
点赞 0
评论 0
项目:StreamCQL
作者:
评论列表
文章目录