@Override
public <T> Observable<DataPoint<T>> findDataPoints(MetricId<T> metricId, long start, long end, int limit,
Order order, int pageSize) {
Timer.Context context = getRawDataReadLatency().time();
checkArgument(isValidTimeRange(start, end), "Invalid time range");
Order safeOrder = (null == order) ? Order.ASC : order;
MetricType<T> metricType = metricId.getType();
Func1<Row, DataPoint<T>> mapper = getDataPointMapper(metricType);
if (metricType == GAUGE || metricType == AVAILABILITY || metricType == COUNTER) {
long sliceStart = DateTimeService.getTimeSlice(start, Duration.standardHours(2));
Func1<Row, DataPoint<T>> tempMapper = (Func1<Row, DataPoint<T>>) tempDataPointMappers.get(metricType);
// Calls mostly deprecated methods..
// Observable<DataPoint<T>> uncompressedPoints = dataAccess.findOldData(metricId, start, end, limit, safeOrder,
// pageSize).map(mapper).doOnError(Throwable::printStackTrace);
Observable<DataPoint<T>> compressedPoints =
dataAccess.findCompressedData(metricId, sliceStart, end, limit, safeOrder)
.compose(new DataPointDecompressTransformer(metricType, safeOrder, limit, start, end));
Observable<DataPoint<T>> tempStoragePoints = dataAccess.findTempData(metricId, start, end, limit,
safeOrder, pageSize)
.map(tempMapper);
Comparator<DataPoint<T>> comparator = getDataPointComparator(safeOrder);
List<Observable<? extends DataPoint<T>>> sources = new ArrayList<>(3);
// sources.add(uncompressedPoints);
sources.add(compressedPoints);
sources.add(tempStoragePoints);
Observable<DataPoint<T>> dataPoints = SortedMerge.create(sources, comparator, false)
.distinctUntilChanged(
(tDataPoint, tDataPoint2) -> comparator.compare(tDataPoint, tDataPoint2) == 0);
if (limit > 0) {
dataPoints = dataPoints.take(limit);
}
return dataPoints;
}
Func6<MetricId<T>, Long, Long, Integer, Order, Integer, Observable<Row>> finder =
getDataPointFinder(metricType);
Observable<DataPoint<T>> results =
finder.call(metricId, start, end, limit, safeOrder, pageSize).map(mapper);
return results.doOnCompleted(context::stop);
}
MetricsServiceImpl.java 文件源码
java
阅读 27
收藏 0
点赞 0
评论 0
项目:hawkular-metrics
作者:
评论列表
文章目录