KafkaReporterTest.java 文件源码

java
阅读 20 收藏 0 点赞 0 评论 0

项目:java-kafka 作者:
@Test
public void testTopicReporter() {
    MetricsRegistry registry = new MetricsRegistry();
    Counter counter = registry.newCounter(KafkaReporterTest.class, "test-counter");
    counter.inc();

    Properties producerProps = new Properties();
    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConnect);
    producerProps.put("schema.registry.url", schemaRegistry);

    KafkaReporter reporter = new KafkaReporter(registry, producerProps, topic);
    reporter.start(1, TimeUnit.SECONDS);

    Properties props = new Properties();
    props.put("zookeeper.connect", zkConnect);
    props.put("group.id", UUID.randomUUID().toString());
    props.put("auto.offset.reset", "smallest");
    props.put("zookeeper.session.timeout.ms", "30000");
    props.put("consumer.timeout.ms", "30000");
    props.put("schema.registry.url", schemaRegistry);

    ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
    KafkaStream<String, Object> messageStream = consumer.createMessageStreamsByFilter(new Whitelist(topic),
            1,
            new StringDecoder(null),
            new KafkaAvroDecoder(new VerifiableProperties(props))).get(0);

    GenericRecord message = (GenericRecord) messageStream.iterator().next().message();
    assertNotNull(message);

    reporter.shutdown();
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号