public void activate() {
consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
Map<String,Integer> topickMap = new HashMap<String, Integer>();
topickMap.put(topic, 1);
System.out.println("*********Results********topic:"+topic);
Map<String, List<KafkaStream<byte[],byte[]>>> streamMap=consumer.createMessageStreams(topickMap);
KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);
ConsumerIterator<byte[],byte[]> it =stream.iterator();
while(it.hasNext()){
String value =new String(it.next().message());
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd日 HH:mm:ss SSS");
Date curDate = new Date(System.currentTimeMillis());
String str = formatter.format(curDate);
System.out.println("storm接收到来自kafka的消息--->" + value);
collector.emit(new Values(value,1,str), value);
}
}
KafkaSpoutTest.java 文件源码
java
阅读 21
收藏 0
点赞 0
评论 0
项目:LogRTA
作者:
评论列表
文章目录