KafkaSourceOp.java 文件源码

java
阅读 18 收藏 0 点赞 0 评论 0

项目:StreamCQL 作者:
/**
 * {@inheritDoc}
 */
@Override
public void initialize()
    throws StreamingException
{
    ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties);
    consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

    Map<String, Integer> topicCountMap = Maps.newHashMap();
    topicCountMap.put(topic, TOPIC_COUNT);

    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
        consumerConnector.createMessageStreams(topicCountMap);
    KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
    consumerIterator = stream.iterator();
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号