MetricsServiceImpl.java 文件源码

java
阅读 27 收藏 0 点赞 0 评论 0

项目:hawkular-metrics 作者:
@Override
    public <T> Observable<DataPoint<T>> findDataPoints(MetricId<T> metricId, long start, long end, int limit,
                                                       Order order, int pageSize) {

        Timer.Context context = getRawDataReadLatency().time();
        checkArgument(isValidTimeRange(start, end), "Invalid time range");
        Order safeOrder = (null == order) ? Order.ASC : order;
        MetricType<T> metricType = metricId.getType();
        Func1<Row, DataPoint<T>> mapper = getDataPointMapper(metricType);

        if (metricType == GAUGE || metricType == AVAILABILITY || metricType == COUNTER) {
            long sliceStart = DateTimeService.getTimeSlice(start, Duration.standardHours(2));

            Func1<Row, DataPoint<T>> tempMapper = (Func1<Row, DataPoint<T>>) tempDataPointMappers.get(metricType);

            // Calls mostly deprecated methods..
//            Observable<DataPoint<T>> uncompressedPoints = dataAccess.findOldData(metricId, start, end, limit, safeOrder,
//                    pageSize).map(mapper).doOnError(Throwable::printStackTrace);

            Observable<DataPoint<T>> compressedPoints =
                    dataAccess.findCompressedData(metricId, sliceStart, end, limit, safeOrder)
                            .compose(new DataPointDecompressTransformer(metricType, safeOrder, limit, start, end));

            Observable<DataPoint<T>> tempStoragePoints = dataAccess.findTempData(metricId, start, end, limit,
                    safeOrder, pageSize)
                    .map(tempMapper);

            Comparator<DataPoint<T>> comparator = getDataPointComparator(safeOrder);
            List<Observable<? extends DataPoint<T>>> sources = new ArrayList<>(3);
//            sources.add(uncompressedPoints);
            sources.add(compressedPoints);
            sources.add(tempStoragePoints);

            Observable<DataPoint<T>> dataPoints = SortedMerge.create(sources, comparator, false)
                    .distinctUntilChanged(
                            (tDataPoint, tDataPoint2) -> comparator.compare(tDataPoint, tDataPoint2) == 0);

            if (limit > 0) {
                dataPoints = dataPoints.take(limit);
            }

            return dataPoints;
        }
        Func6<MetricId<T>, Long, Long, Integer, Order, Integer, Observable<Row>> finder =
                getDataPointFinder(metricType);

        Observable<DataPoint<T>> results =
                finder.call(metricId, start, end, limit, safeOrder, pageSize).map(mapper);
        return results.doOnCompleted(context::stop);
    }
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号