private static Map<String, Location> getVehicleStartPoints() {
Map<String, Location> vehicleStartPoint = new HashMap<String, Location>();
Properties props = new Properties();
props.put("zookeeper.connect", ZOOKEEPER_CONNECTION_STRING);
props.put("group.id", "DataLoader" + r.nextInt(100));
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("auto.offset.reset", "smallest");
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(KAFKA_TOPIC_STATIC_DATA, new Integer(1));
KafkaStream<byte[], byte[]> stream = consumer.createMessageStreams(topicCountMap).get(KAFKA_TOPIC_STATIC_DATA)
.get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
String message = new String(it.next().message());
try {
vehicleStartPoint = objectMapper.readValue(message, new TypeReference<Map<String, Location>>() {
});
} catch (IOException e) {
e.printStackTrace();
}
break;
}
consumer.shutdown();
return vehicleStartPoint;
}
VehicleDataGeneration.java 文件源码
java
阅读 23
收藏 0
点赞 0
评论 0
项目:Practical-Real-time-Processing-and-Analytics
作者:
评论列表
文章目录