@Test(timeout=10000)
public void testCombineLatestRequestOverflow() throws InterruptedException {
List<Observable<Integer>> sources = Arrays.asList(Observable.from(Arrays.asList(1,2,3,4)), Observable.from(Arrays.asList(5,6,7,8)));
Observable<Integer> o = Observable.combineLatest(sources,new FuncN<Integer>() {
@Override
public Integer call(Object... args) {
return (Integer) args[0];
}});
//should get at least 4
final CountDownLatch latch = new CountDownLatch(4);
o.subscribeOn(Schedulers.computation()).subscribe(new Subscriber<Integer>() {
@Override
public void onStart() {
request(2);
}
@Override
public void onComplete() {
//ignore
}
@Override
public void onError(Throwable e) {
throw new RuntimeException(e);
}
@Override
public void onNext(Integer t) {
latch.countDown();
request(Long.MAX_VALUE-1);
}});
assertTrue(latch.await(10, TimeUnit.SECONDS));
}
OnSubscribeCombineLatestTest.java 文件源码
java
阅读 26
收藏 0
点赞 0
评论 0
项目:RxJavaFlow
作者:
评论列表
文章目录