@Test
public void test6SourcesOverload() {
Observable<Integer> s1 = Observable.just(1);
Observable<Integer> s2 = Observable.just(2);
Observable<Integer> s3 = Observable.just(3);
Observable<Integer> s4 = Observable.just(4);
Observable<Integer> s5 = Observable.just(5);
Observable<Integer> s6 = Observable.just(6);
Observable<List<Integer>> result = Observable.combineLatest(s1, s2, s3, s4, s5, s6,
new Func6<Integer, Integer, Integer, Integer, Integer, Integer, List<Integer>>() {
@Override
public List<Integer> call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Integer t6) {
return Arrays.asList(t1, t2, t3, t4, t5, t6);
}
});
@SuppressWarnings("unchecked")
Observer<Object> o = mock(Observer.class);
result.subscribe(o);
verify(o).onNext(Arrays.asList(1, 2, 3, 4, 5, 6));
verify(o).onComplete();
verify(o, never()).onError(any(Throwable.class));
}
java类rx.functions.Func6的实例源码
OnSubscribeCombineLatestTest.java 文件源码
项目:RxJavaFlow
阅读 26
收藏 0
点赞 0
评论 0
MetricsServiceImpl.java 文件源码
项目:hawkular-metrics
阅读 21
收藏 0
点赞 0
评论 0
@SuppressWarnings("unchecked")
private <T> Func6<MetricId<T>, Long, Long, Integer, Order, Integer, Observable<Row>> getDataPointFinder(
MetricType<T> metricType) {
Func6<MetricId<T>, Long, Long, Integer, Order, Integer, Observable<Row>> finder;
finder = (Func6<MetricId<T>, Long, Long, Integer, Order, Integer, Observable<Row>>) dataPointFinders
.get(metricType);
if (finder == null) {
throw new UnsupportedOperationException(metricType.getText());
}
return finder;
}
Observable.java 文件源码
项目:letv
阅读 25
收藏 0
点赞 0
评论 0
public static <T1, T2, T3, T4, T5, T6, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> combineFunction) {
return combineLatest(Arrays.asList(new Observable[]{o1, o2, o3, o4, o5, o6}), Functions.fromFunc(combineFunction));
}
Observable.java 文件源码
项目:letv
阅读 32
收藏 0
点赞 0
评论 0
public static <T1, T2, T3, T4, T5, T6, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> zipFunction) {
return just(new Observable[]{o1, o2, o3, o4, o5, o6}).lift(new OperatorZip(zipFunction));
}
Single.java 文件源码
项目:boohee_v5.6
阅读 27
收藏 0
点赞 0
评论 0
public static final <T1, T2, T3, T4, T5, T6, R> Single<R> zip(Single<? extends T1> o1, Single<? extends T2> o2, Single<? extends T3> o3, Single<? extends T4> o4, Single<? extends T5> o5, Single<? extends T6> o6, Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> zipFunction) {
return just(new Observable[]{asObservable(o1), asObservable(o2), asObservable(o3), asObservable(o4), asObservable(o5), asObservable(o6)}).lift(new OperatorZip((Func6) zipFunction));
}
OperatorZip.java 文件源码
项目:boohee_v5.6
阅读 20
收藏 0
点赞 0
评论 0
public OperatorZip(Func6 f) {
this.zipFunction = Functions.fromFunc(f);
}
Observable.java 文件源码
项目:boohee_v5.6
阅读 22
收藏 0
点赞 0
评论 0
public static final <T1, T2, T3, T4, T5, T6, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> combineFunction) {
return combineLatest(Arrays.asList(new Observable[]{o1, o2, o3, o4, o5, o6}), Functions.fromFunc(combineFunction));
}
Observable.java 文件源码
项目:boohee_v5.6
阅读 23
收藏 0
点赞 0
评论 0
public static final <T1, T2, T3, T4, T5, T6, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> zipFunction) {
return just(new Observable[]{o1, o2, o3, o4, o5, o6}).lift(new OperatorZip(zipFunction));
}
RxHelper.java 文件源码
项目:sfs
阅读 26
收藏 0
点赞 0
评论 0
@SuppressWarnings("unchecked")
public static final <T1, T2, T3, T4, T5, T6, R> Observable<R> combineSinglesDelayError(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6,
Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> combineFunction) {
return combineSinglesDelayError(asList(o1.single(), o2.single(), o3.single(), o4.single(), o5.single(), o6.single()), fromFunc(combineFunction));
}
OperatorZip.java 文件源码
项目:RxJavaFlow
阅读 21
收藏 0
点赞 0
评论 0
@SuppressWarnings({ "unchecked", "rawtypes" })
public OperatorZip(Func6 f) {
this.zipFunction = Functions.fromFunc(f);
}
MetricsServiceImpl.java 文件源码
项目:hawkular-metrics
阅读 25
收藏 0
点赞 0
评论 0
public void startUp(Session session, String keyspace, boolean resetDb, boolean createSchema,
HawkularMetricRegistry metricRegistry) {
session.execute("USE " + keyspace);
log.infoKeyspaceUsed(keyspace);
metricsTasks = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4, new MetricsThreadFactory()));
loadDataRetentions();
this.metricRegistry = metricRegistry;
pointsInserter = ImmutableMap
.<MetricType<?>, Func1<Observable<? extends Metric<?>>, Observable<Integer>>>builder()
.put(GAUGE, metric -> {
@SuppressWarnings("unchecked")
Observable<Metric<Double>> gauge = (Observable<Metric<Double>>) metric;
return dataAccess.insertData(gauge);
})
.put(COUNTER, metric -> {
@SuppressWarnings("unchecked")
Observable<Metric<Long>> counter = (Observable<Metric<Long>>) metric;
return dataAccess.insertData(counter);
})
.put(AVAILABILITY, metric -> {
@SuppressWarnings("unchecked")
Observable<Metric<AvailabilityType>> avail = (Observable<Metric<AvailabilityType>>) metric;
return dataAccess.insertData(avail);
})
.put(STRING, metric -> {
@SuppressWarnings("unchecked")
Observable<Metric<String>> string = (Observable<Metric<String>>) metric;
return dataAccess.insertStringDatas(string, this::getTTL, maxStringSize);
})
.build();
dataPointFinders = ImmutableMap
.<MetricType<?>, Func6<? extends MetricId<?>, Long, Long, Integer, Order, Integer,
Observable<Row>>>builder()
.put(STRING, (metricId, start, end, limit, order, pageSize) -> {
@SuppressWarnings("unchecked")
MetricId<String> stringId = (MetricId<String>) metricId;
return dataAccess.findStringData(stringId, start, end, limit, order, pageSize);
})
.build();
dataPointMappers = ImmutableMap.<MetricType<?>, Func1<Row, ? extends DataPoint<?>>> builder()
.put(GAUGE, Functions::getGaugeDataPoint)
.put(AVAILABILITY, Functions::getAvailabilityDataPoint)
.put(COUNTER, Functions::getCounterDataPoint)
.put(STRING, Functions::getStringDataPoint)
.build();
tempDataPointMappers = ImmutableMap.<MetricType<?>, Func1<Row, ? extends DataPoint<?>>> builder()
.put(GAUGE, Functions::getTempGaugeDataPoint)
.put(COUNTER, Functions::getTempCounterDataPoint)
.put(AVAILABILITY, Functions::getTempAvailabilityDataPoint)
.build();
initConfiguration(session);
setDefaultTTL(session, keyspace);
initMetrics();
verifyAndCreateTempTables();
tagQueryParser = new SimpleTagQueryParser(this.dataAccess, this);
expresssionTagQueryParser = new ExpressionTagQueryParser(this.dataAccess, this);
}
MetricsServiceImpl.java 文件源码
项目:hawkular-metrics
阅读 23
收藏 0
点赞 0
评论 0
@Override
public <T> Observable<DataPoint<T>> findDataPoints(MetricId<T> metricId, long start, long end, int limit,
Order order, int pageSize) {
Timer.Context context = getRawDataReadLatency().time();
checkArgument(isValidTimeRange(start, end), "Invalid time range");
Order safeOrder = (null == order) ? Order.ASC : order;
MetricType<T> metricType = metricId.getType();
Func1<Row, DataPoint<T>> mapper = getDataPointMapper(metricType);
if (metricType == GAUGE || metricType == AVAILABILITY || metricType == COUNTER) {
long sliceStart = DateTimeService.getTimeSlice(start, Duration.standardHours(2));
Func1<Row, DataPoint<T>> tempMapper = (Func1<Row, DataPoint<T>>) tempDataPointMappers.get(metricType);
// Calls mostly deprecated methods..
// Observable<DataPoint<T>> uncompressedPoints = dataAccess.findOldData(metricId, start, end, limit, safeOrder,
// pageSize).map(mapper).doOnError(Throwable::printStackTrace);
Observable<DataPoint<T>> compressedPoints =
dataAccess.findCompressedData(metricId, sliceStart, end, limit, safeOrder)
.compose(new DataPointDecompressTransformer(metricType, safeOrder, limit, start, end));
Observable<DataPoint<T>> tempStoragePoints = dataAccess.findTempData(metricId, start, end, limit,
safeOrder, pageSize)
.map(tempMapper);
Comparator<DataPoint<T>> comparator = getDataPointComparator(safeOrder);
List<Observable<? extends DataPoint<T>>> sources = new ArrayList<>(3);
// sources.add(uncompressedPoints);
sources.add(compressedPoints);
sources.add(tempStoragePoints);
Observable<DataPoint<T>> dataPoints = SortedMerge.create(sources, comparator, false)
.distinctUntilChanged(
(tDataPoint, tDataPoint2) -> comparator.compare(tDataPoint, tDataPoint2) == 0);
if (limit > 0) {
dataPoints = dataPoints.take(limit);
}
return dataPoints;
}
Func6<MetricId<T>, Long, Long, Integer, Order, Integer, Observable<Row>> finder =
getDataPointFinder(metricType);
Observable<DataPoint<T>> results =
finder.call(metricId, start, end, limit, safeOrder, pageSize).map(mapper);
return results.doOnCompleted(context::stop);
}
OperatorZip.java 文件源码
项目:org.openntf.domino
阅读 23
收藏 0
点赞 0
评论 0
@SuppressWarnings({ "unchecked", "rawtypes" })
public OperatorZip(Func6 f) {
this.zipFunction = Functions.fromFunc(f);
}