KafkaIOTest.java 文件源码

java
阅读 29 收藏 0 点赞 0 评论 0

项目:beam 作者:
@Test
public void testUnboundedSourceMetrics() {
  int numElements = 1000;

  String readStep = "readFromKafka";

  p.apply(readStep,
      mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata());

  PipelineResult result = p.run();

  String splitId = "0";

  MetricName elementsRead = SourceMetrics.elementsRead().getName();
  MetricName elementsReadBySplit = SourceMetrics.elementsReadBySplit(splitId).getName();
  MetricName bytesRead = SourceMetrics.bytesRead().getName();
  MetricName bytesReadBySplit = SourceMetrics.bytesReadBySplit(splitId).getName();
  MetricName backlogElementsOfSplit = SourceMetrics.backlogElementsOfSplit(splitId).getName();
  MetricName backlogBytesOfSplit = SourceMetrics.backlogBytesOfSplit(splitId).getName();

  MetricQueryResults metrics = result.metrics().queryMetrics(
      MetricsFilter.builder().build());

  Iterable<MetricResult<Long>> counters = metrics.counters();

  assertThat(counters, hasItem(attemptedMetricsResult(
      elementsRead.namespace(),
      elementsRead.name(),
      readStep,
      1000L)));

  assertThat(counters, hasItem(attemptedMetricsResult(
      elementsReadBySplit.namespace(),
      elementsReadBySplit.name(),
      readStep,
      1000L)));

  assertThat(counters, hasItem(attemptedMetricsResult(
      bytesRead.namespace(),
      bytesRead.name(),
      readStep,
      12000L)));

  assertThat(counters, hasItem(attemptedMetricsResult(
      bytesReadBySplit.namespace(),
      bytesReadBySplit.name(),
      readStep,
      12000L)));

  MetricQueryResults backlogElementsMetrics =
      result.metrics().queryMetrics(
          MetricsFilter.builder()
              .addNameFilter(
                  MetricNameFilter.named(
                      backlogElementsOfSplit.namespace(),
                      backlogElementsOfSplit.name()))
              .build());

  // since gauge values may be inconsistent in some environments assert only on their existence.
  assertThat(backlogElementsMetrics.gauges(),
      IsIterableWithSize.<MetricResult<GaugeResult>>iterableWithSize(1));

  MetricQueryResults backlogBytesMetrics =
      result.metrics().queryMetrics(
          MetricsFilter.builder()
              .addNameFilter(
                  MetricNameFilter.named(
                      backlogBytesOfSplit.namespace(),
                      backlogBytesOfSplit.name()))
              .build());

  // since gauge values may be inconsistent in some environments assert only on their existence.
  assertThat(backlogBytesMetrics.gauges(),
      IsIterableWithSize.<MetricResult<GaugeResult>>iterableWithSize(1));
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号