HighLevelConsumerDemo.java 文件源码

java
阅读 28 收藏 0 点赞 0 评论 0

项目:jaf-examples 作者:
public static void main(String[] args) {
    args = new String[] { Constants.ZK_SERVER, Constants.TOPIC_NAME, "group1", "consumer1" };
    if (args == null || args.length != 4) {
        System.err.println("Usage:\n\tjava -jar kafka_consumer.jar ${zookeeper_list} ${topic_name} ${group_name} ${consumer_id}");
        System.exit(1);
    }
    String zk = args[0];
    String topic = args[1];
    String groupid = args[2];
    String consumerid = args[3];
    Properties props = new Properties();
    props.put("zookeeper.connect", zk);
    props.put("group.id", groupid);
    props.put("client.id", "test");
    props.put("consumer.id", consumerid);
    props.put("auto.offset.reset", "smallest");
    props.put("auto.commit.enable", "true");
    props.put("auto.commit.interval.ms", "60000");

    ConsumerConfig consumerConfig = new ConsumerConfig(props);
    ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, 1);
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);

    KafkaStream<byte[], byte[]> stream1 = consumerMap.get(topic).get(0);
    ConsumerIterator<byte[], byte[]> interator = stream1.iterator();
    while (interator.hasNext()) {
        MessageAndMetadata<byte[], byte[]> messageAndMetadata = interator.next();
        String message = String.format(
                "Topic:%s, GroupID:%s, Consumer ID:%s, PartitionID:%s, Offset:%s, Message Key:%s, Message Payload: %s",
                messageAndMetadata.topic(), groupid, consumerid, messageAndMetadata.partition(),
                messageAndMetadata.offset(), new String(messageAndMetadata.key() != null ? messageAndMetadata.key() : "".getBytes()),
                new String(messageAndMetadata.message()));
        System.out.println(message);
    }
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号