VehicleDataGeneration.java 文件源码

java
阅读 23 收藏 0 点赞 0 评论 0

项目:Practical-Real-time-Processing-and-Analytics 作者:
private static Map<String, Location> getVehicleStartPoints() {
    Map<String, Location> vehicleStartPoint = new HashMap<String, Location>();
    Properties props = new Properties();
    props.put("zookeeper.connect", ZOOKEEPER_CONNECTION_STRING);
    props.put("group.id", "DataLoader" + r.nextInt(100));
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());
    props.put("auto.offset.reset", "smallest");

    ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(KAFKA_TOPIC_STATIC_DATA, new Integer(1)); 

    KafkaStream<byte[], byte[]> stream = consumer.createMessageStreams(topicCountMap).get(KAFKA_TOPIC_STATIC_DATA)
            .get(0);

    ConsumerIterator<byte[], byte[]> it = stream.iterator();

    while (it.hasNext()) {
        String message = new String(it.next().message());
        try {
            vehicleStartPoint = objectMapper.readValue(message, new TypeReference<Map<String, Location>>() {
            });
        } catch (IOException e) {
            e.printStackTrace();
        }
        break;
    }
    consumer.shutdown();
    return vehicleStartPoint;
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号