KafkaReporterTest.java 文件源码

java
阅读 20 收藏 0 点赞 0 评论 0

项目:java-kafka 作者:
@Test
    public void testCodahaleKafkaMetricsReporter() {
        registry = new MetricRegistry();
        registry.counter("test_counter").inc();

        kafkaReporter = KafkaReporter.builder(registry,
                kafkaConnect,
                topic,
                schemaRegistry).build();

//        ObjectMapper mapper = new ObjectMapper().registerModule(new MetricsModule(TimeUnit.SECONDS,
//                TimeUnit.SECONDS,
//                false));
//        StringWriter r = new StringWriter();
//        try {
//            mapper.writeValue(r, registry);
//        } catch (IOException e) {
//            e.printStackTrace();
//        }

        kafkaReporter.report();

        Properties props = new Properties();
        props.put("zookeeper.connect", zkConnect);
        props.put("group.id", UUID.randomUUID().toString());
        props.put("auto.offset.reset", "smallest");
        props.put("zookeeper.session.timeout.ms", "30000");
        props.put("consumer.timeout.ms", "30000");
        props.put("schema.registry.url", schemaRegistry);

        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
        KafkaStream<String, Object> messageStream = consumer.createMessageStreamsByFilter(new Whitelist(topic),
                1,
                new StringDecoder(null),
                new KafkaAvroDecoder(new VerifiableProperties(props))).get(0);

        GenericRecord message = (GenericRecord) messageStream.iterator().next().message();
        assertNotNull(message);
    }
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号