@Test
public void testUnboundedSourceMetrics() {
int numElements = 1000;
String readStep = "readFromKafka";
p.apply(readStep,
mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata());
PipelineResult result = p.run();
String splitId = "0";
MetricName elementsRead = SourceMetrics.elementsRead().getName();
MetricName elementsReadBySplit = SourceMetrics.elementsReadBySplit(splitId).getName();
MetricName bytesRead = SourceMetrics.bytesRead().getName();
MetricName bytesReadBySplit = SourceMetrics.bytesReadBySplit(splitId).getName();
MetricName backlogElementsOfSplit = SourceMetrics.backlogElementsOfSplit(splitId).getName();
MetricName backlogBytesOfSplit = SourceMetrics.backlogBytesOfSplit(splitId).getName();
MetricQueryResults metrics = result.metrics().queryMetrics(
MetricsFilter.builder().build());
Iterable<MetricResult<Long>> counters = metrics.counters();
assertThat(counters, hasItem(attemptedMetricsResult(
elementsRead.namespace(),
elementsRead.name(),
readStep,
1000L)));
assertThat(counters, hasItem(attemptedMetricsResult(
elementsReadBySplit.namespace(),
elementsReadBySplit.name(),
readStep,
1000L)));
assertThat(counters, hasItem(attemptedMetricsResult(
bytesRead.namespace(),
bytesRead.name(),
readStep,
12000L)));
assertThat(counters, hasItem(attemptedMetricsResult(
bytesReadBySplit.namespace(),
bytesReadBySplit.name(),
readStep,
12000L)));
MetricQueryResults backlogElementsMetrics =
result.metrics().queryMetrics(
MetricsFilter.builder()
.addNameFilter(
MetricNameFilter.named(
backlogElementsOfSplit.namespace(),
backlogElementsOfSplit.name()))
.build());
// since gauge values may be inconsistent in some environments assert only on their existence.
assertThat(backlogElementsMetrics.gauges(),
IsIterableWithSize.<MetricResult<GaugeResult>>iterableWithSize(1));
MetricQueryResults backlogBytesMetrics =
result.metrics().queryMetrics(
MetricsFilter.builder()
.addNameFilter(
MetricNameFilter.named(
backlogBytesOfSplit.namespace(),
backlogBytesOfSplit.name()))
.build());
// since gauge values may be inconsistent in some environments assert only on their existence.
assertThat(backlogBytesMetrics.gauges(),
IsIterableWithSize.<MetricResult<GaugeResult>>iterableWithSize(1));
}
KafkaIOTest.java 文件源码
java
阅读 29
收藏 0
点赞 0
评论 0
项目:beam
作者:
评论列表
文章目录