@Test
public void testCodahaleKafkaMetricsReporter() {
registry = new MetricRegistry();
registry.counter("test_counter").inc();
kafkaReporter = KafkaReporter.builder(registry,
kafkaConnect,
topic,
schemaRegistry).build();
// ObjectMapper mapper = new ObjectMapper().registerModule(new MetricsModule(TimeUnit.SECONDS,
// TimeUnit.SECONDS,
// false));
// StringWriter r = new StringWriter();
// try {
// mapper.writeValue(r, registry);
// } catch (IOException e) {
// e.printStackTrace();
// }
kafkaReporter.report();
Properties props = new Properties();
props.put("zookeeper.connect", zkConnect);
props.put("group.id", UUID.randomUUID().toString());
props.put("auto.offset.reset", "smallest");
props.put("zookeeper.session.timeout.ms", "30000");
props.put("consumer.timeout.ms", "30000");
props.put("schema.registry.url", schemaRegistry);
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
KafkaStream<String, Object> messageStream = consumer.createMessageStreamsByFilter(new Whitelist(topic),
1,
new StringDecoder(null),
new KafkaAvroDecoder(new VerifiableProperties(props))).get(0);
GenericRecord message = (GenericRecord) messageStream.iterator().next().message();
assertNotNull(message);
}
KafkaReporterTest.java 文件源码
java
阅读 20
收藏 0
点赞 0
评论 0
项目:java-kafka
作者:
评论列表
文章目录