KafkaSpoutTest.java 文件源码

java
阅读 21 收藏 0 点赞 0 评论 0

项目:LogRTA 作者:
public void activate() {         
    consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); 
    Map<String,Integer> topickMap = new HashMap<String, Integer>();  
    topickMap.put(topic, 1);  

    System.out.println("*********Results********topic:"+topic);  

    Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap=consumer.createMessageStreams(topickMap);  
    KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);  
    ConsumerIterator<byte[],byte[]> it =stream.iterator();   
    while(it.hasNext()){  
         String value =new String(it.next().message());
         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd日 HH:mm:ss SSS");
         Date curDate = new Date(System.currentTimeMillis());       
         String str = formatter.format(curDate);   
         System.out.println("storm接收到来自kafka的消息--->" + value);
         collector.emit(new Values(value,1,str), value);
    }  
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号