java类rx.functions.Func6的实例源码

OnSubscribeCombineLatestTest.java 文件源码 项目:RxJavaFlow 阅读 26 收藏 0 点赞 0 评论 0
@Test
public void test6SourcesOverload() {
    Observable<Integer> s1 = Observable.just(1);
    Observable<Integer> s2 = Observable.just(2);
    Observable<Integer> s3 = Observable.just(3);
    Observable<Integer> s4 = Observable.just(4);
    Observable<Integer> s5 = Observable.just(5);
    Observable<Integer> s6 = Observable.just(6);

    Observable<List<Integer>> result = Observable.combineLatest(s1, s2, s3, s4, s5, s6,
            new Func6<Integer, Integer, Integer, Integer, Integer, Integer, List<Integer>>() {
                @Override
                public List<Integer> call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Integer t6) {
                    return Arrays.asList(t1, t2, t3, t4, t5, t6);
                }
            });

    @SuppressWarnings("unchecked")
    Observer<Object> o = mock(Observer.class);

    result.subscribe(o);

    verify(o).onNext(Arrays.asList(1, 2, 3, 4, 5, 6));
    verify(o).onComplete();
    verify(o, never()).onError(any(Throwable.class));
}
MetricsServiceImpl.java 文件源码 项目:hawkular-metrics 阅读 21 收藏 0 点赞 0 评论 0
@SuppressWarnings("unchecked")
private <T> Func6<MetricId<T>, Long, Long, Integer, Order, Integer, Observable<Row>> getDataPointFinder(
        MetricType<T> metricType) {
    Func6<MetricId<T>, Long, Long, Integer, Order, Integer, Observable<Row>> finder;
    finder = (Func6<MetricId<T>, Long, Long, Integer, Order, Integer, Observable<Row>>) dataPointFinders
            .get(metricType);
    if (finder == null) {
        throw new UnsupportedOperationException(metricType.getText());
    }
    return finder;
}
Observable.java 文件源码 项目:letv 阅读 25 收藏 0 点赞 0 评论 0
public static <T1, T2, T3, T4, T5, T6, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> combineFunction) {
    return combineLatest(Arrays.asList(new Observable[]{o1, o2, o3, o4, o5, o6}), Functions.fromFunc(combineFunction));
}
Observable.java 文件源码 项目:letv 阅读 32 收藏 0 点赞 0 评论 0
public static <T1, T2, T3, T4, T5, T6, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> zipFunction) {
    return just(new Observable[]{o1, o2, o3, o4, o5, o6}).lift(new OperatorZip(zipFunction));
}
Single.java 文件源码 项目:boohee_v5.6 阅读 27 收藏 0 点赞 0 评论 0
public static final <T1, T2, T3, T4, T5, T6, R> Single<R> zip(Single<? extends T1> o1, Single<? extends T2> o2, Single<? extends T3> o3, Single<? extends T4> o4, Single<? extends T5> o5, Single<? extends T6> o6, Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> zipFunction) {
    return just(new Observable[]{asObservable(o1), asObservable(o2), asObservable(o3), asObservable(o4), asObservable(o5), asObservable(o6)}).lift(new OperatorZip((Func6) zipFunction));
}
OperatorZip.java 文件源码 项目:boohee_v5.6 阅读 20 收藏 0 点赞 0 评论 0
public OperatorZip(Func6 f) {
    this.zipFunction = Functions.fromFunc(f);
}
Observable.java 文件源码 项目:boohee_v5.6 阅读 22 收藏 0 点赞 0 评论 0
public static final <T1, T2, T3, T4, T5, T6, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> combineFunction) {
    return combineLatest(Arrays.asList(new Observable[]{o1, o2, o3, o4, o5, o6}), Functions.fromFunc(combineFunction));
}
Observable.java 文件源码 项目:boohee_v5.6 阅读 23 收藏 0 点赞 0 评论 0
public static final <T1, T2, T3, T4, T5, T6, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> zipFunction) {
    return just(new Observable[]{o1, o2, o3, o4, o5, o6}).lift(new OperatorZip(zipFunction));
}
RxHelper.java 文件源码 项目:sfs 阅读 26 收藏 0 点赞 0 评论 0
@SuppressWarnings("unchecked")
public static final <T1, T2, T3, T4, T5, T6, R> Observable<R> combineSinglesDelayError(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6,
                                                                                       Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> combineFunction) {
    return combineSinglesDelayError(asList(o1.single(), o2.single(), o3.single(), o4.single(), o5.single(), o6.single()), fromFunc(combineFunction));
}
OperatorZip.java 文件源码 项目:RxJavaFlow 阅读 21 收藏 0 点赞 0 评论 0
@SuppressWarnings({ "unchecked", "rawtypes" })
public OperatorZip(Func6 f) {
    this.zipFunction = Functions.fromFunc(f);
}
MetricsServiceImpl.java 文件源码 项目:hawkular-metrics 阅读 25 收藏 0 点赞 0 评论 0
public void startUp(Session session, String keyspace, boolean resetDb, boolean createSchema,
        HawkularMetricRegistry metricRegistry) {
    session.execute("USE " + keyspace);
    log.infoKeyspaceUsed(keyspace);
    metricsTasks = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4, new MetricsThreadFactory()));
    loadDataRetentions();

    this.metricRegistry = metricRegistry;

    pointsInserter = ImmutableMap
            .<MetricType<?>, Func1<Observable<? extends Metric<?>>, Observable<Integer>>>builder()
            .put(GAUGE, metric -> {
                @SuppressWarnings("unchecked")
                Observable<Metric<Double>> gauge = (Observable<Metric<Double>>) metric;
                return dataAccess.insertData(gauge);
            })
            .put(COUNTER, metric -> {
                @SuppressWarnings("unchecked")
                Observable<Metric<Long>> counter = (Observable<Metric<Long>>) metric;
                return dataAccess.insertData(counter);
            })
            .put(AVAILABILITY, metric -> {
                @SuppressWarnings("unchecked")
                Observable<Metric<AvailabilityType>> avail = (Observable<Metric<AvailabilityType>>) metric;
                return dataAccess.insertData(avail);
            })
            .put(STRING, metric -> {
                @SuppressWarnings("unchecked")
                Observable<Metric<String>> string = (Observable<Metric<String>>) metric;
                return dataAccess.insertStringDatas(string, this::getTTL, maxStringSize);
            })
            .build();

    dataPointFinders = ImmutableMap
            .<MetricType<?>, Func6<? extends MetricId<?>, Long, Long, Integer, Order, Integer,
                    Observable<Row>>>builder()
            .put(STRING, (metricId, start, end, limit, order, pageSize) -> {
                @SuppressWarnings("unchecked")
                MetricId<String> stringId = (MetricId<String>) metricId;
                return dataAccess.findStringData(stringId, start, end, limit, order, pageSize);
            })
            .build();

    dataPointMappers = ImmutableMap.<MetricType<?>, Func1<Row, ? extends DataPoint<?>>> builder()
            .put(GAUGE, Functions::getGaugeDataPoint)
            .put(AVAILABILITY, Functions::getAvailabilityDataPoint)
            .put(COUNTER, Functions::getCounterDataPoint)
            .put(STRING, Functions::getStringDataPoint)
            .build();

    tempDataPointMappers = ImmutableMap.<MetricType<?>, Func1<Row, ? extends DataPoint<?>>> builder()
            .put(GAUGE, Functions::getTempGaugeDataPoint)
            .put(COUNTER, Functions::getTempCounterDataPoint)
            .put(AVAILABILITY, Functions::getTempAvailabilityDataPoint)
            .build();

    initConfiguration(session);
    setDefaultTTL(session, keyspace);
    initMetrics();

    verifyAndCreateTempTables();

    tagQueryParser = new SimpleTagQueryParser(this.dataAccess, this);
    expresssionTagQueryParser = new ExpressionTagQueryParser(this.dataAccess, this);
}
MetricsServiceImpl.java 文件源码 项目:hawkular-metrics 阅读 23 收藏 0 点赞 0 评论 0
@Override
    public <T> Observable<DataPoint<T>> findDataPoints(MetricId<T> metricId, long start, long end, int limit,
                                                       Order order, int pageSize) {

        Timer.Context context = getRawDataReadLatency().time();
        checkArgument(isValidTimeRange(start, end), "Invalid time range");
        Order safeOrder = (null == order) ? Order.ASC : order;
        MetricType<T> metricType = metricId.getType();
        Func1<Row, DataPoint<T>> mapper = getDataPointMapper(metricType);

        if (metricType == GAUGE || metricType == AVAILABILITY || metricType == COUNTER) {
            long sliceStart = DateTimeService.getTimeSlice(start, Duration.standardHours(2));

            Func1<Row, DataPoint<T>> tempMapper = (Func1<Row, DataPoint<T>>) tempDataPointMappers.get(metricType);

            // Calls mostly deprecated methods..
//            Observable<DataPoint<T>> uncompressedPoints = dataAccess.findOldData(metricId, start, end, limit, safeOrder,
//                    pageSize).map(mapper).doOnError(Throwable::printStackTrace);

            Observable<DataPoint<T>> compressedPoints =
                    dataAccess.findCompressedData(metricId, sliceStart, end, limit, safeOrder)
                            .compose(new DataPointDecompressTransformer(metricType, safeOrder, limit, start, end));

            Observable<DataPoint<T>> tempStoragePoints = dataAccess.findTempData(metricId, start, end, limit,
                    safeOrder, pageSize)
                    .map(tempMapper);

            Comparator<DataPoint<T>> comparator = getDataPointComparator(safeOrder);
            List<Observable<? extends DataPoint<T>>> sources = new ArrayList<>(3);
//            sources.add(uncompressedPoints);
            sources.add(compressedPoints);
            sources.add(tempStoragePoints);

            Observable<DataPoint<T>> dataPoints = SortedMerge.create(sources, comparator, false)
                    .distinctUntilChanged(
                            (tDataPoint, tDataPoint2) -> comparator.compare(tDataPoint, tDataPoint2) == 0);

            if (limit > 0) {
                dataPoints = dataPoints.take(limit);
            }

            return dataPoints;
        }
        Func6<MetricId<T>, Long, Long, Integer, Order, Integer, Observable<Row>> finder =
                getDataPointFinder(metricType);

        Observable<DataPoint<T>> results =
                finder.call(metricId, start, end, limit, safeOrder, pageSize).map(mapper);
        return results.doOnCompleted(context::stop);
    }
OperatorZip.java 文件源码 项目:org.openntf.domino 阅读 23 收藏 0 点赞 0 评论 0
@SuppressWarnings({ "unchecked", "rawtypes" })
public OperatorZip(Func6 f) {
    this.zipFunction = Functions.fromFunc(f);
}


问题


面经


文章

微信
公众号

扫码关注公众号