/**
* Helper function that consumes messages from a topic with a timeout.
*/
private static void verifyMetricsSubmittedToTopic(
String zkConnect,
String topic,
int expNumMetricSubmissions) throws IOException {
int timeoutMs = 10 * 1000;
KafkaMetricsToFile kafkaMetricsToFile = new KafkaMetricsToFile(zkConnect, timeoutMs);
List<KafkaStream<byte[], byte[]>> streams = kafkaMetricsToFile.getStreams(topic);
int numRecords = 0;
AvroDeserializer decoder = new AvroDeserializer();
try {
for (final KafkaStream<byte[], byte[]> stream : streams) {
for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) {
SupportKafkaMetricsBasic[] container = decoder.deserialize(SupportKafkaMetricsBasic.class,
messageAndMetadata.message());
assertThat(container.length).isEqualTo(1);
verifyBasicMetrics(container[0]);
numRecords++;
}
}
} catch (ConsumerTimeoutException e) {
// do nothing, this is expected success case since we consume with a timeout
}
assertThat(numRecords).isEqualTo(expNumMetricSubmissions);
// Cleanup
kafkaMetricsToFile.shutdown();
}
MetricsToKafkaTest.java 文件源码
java
阅读 26
收藏 0
点赞 0
评论 0
项目:support-metrics-client
作者:
评论列表
文章目录