MetricsToKafkaTest.java 文件源码

java
阅读 26 收藏 0 点赞 0 评论 0

项目:support-metrics-client 作者:
/**
 * Helper function that consumes messages from a topic with a timeout.
 */
private static void verifyMetricsSubmittedToTopic(
    String zkConnect,
    String topic,
    int expNumMetricSubmissions) throws IOException {
  int timeoutMs = 10 * 1000;
  KafkaMetricsToFile kafkaMetricsToFile = new KafkaMetricsToFile(zkConnect, timeoutMs);
  List<KafkaStream<byte[], byte[]>> streams = kafkaMetricsToFile.getStreams(topic);

  int numRecords = 0;
  AvroDeserializer decoder = new AvroDeserializer();
  try {
    for (final KafkaStream<byte[], byte[]> stream : streams) {
      for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) {
        SupportKafkaMetricsBasic[] container = decoder.deserialize(SupportKafkaMetricsBasic.class,
            messageAndMetadata.message());
        assertThat(container.length).isEqualTo(1);
        verifyBasicMetrics(container[0]);
        numRecords++;
      }
    }
  } catch (ConsumerTimeoutException e) {
    // do nothing, this is expected success case since we consume with a timeout
  }

  assertThat(numRecords).isEqualTo(expNumMetricSubmissions);

  // Cleanup
  kafkaMetricsToFile.shutdown();
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号