KafkaDataSpout.java 文件源码

java
阅读 22 收藏 0 点赞 0 评论 0

项目:storm-demos 作者:
public void nextTuple() {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(TopologyConfig.kafkaTopic, 1);//one excutor - one thread
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = conn.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(kafkaTopic);
    ConsumerIterator<byte[], byte[]> iter = streams.get(0).iterator();
    while(true){
        while(iter.hasNext()){

            String s = new String(iter.next().message());
            collector.emit(new Values(s));

            UUID msgId = UUID.randomUUID();
            this.pending.put(msgId, new Values(s));
        }
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            logger.error("Spout : sleep wrong \n", e);
        }
    }
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号