private Observable example3() {
Observable<String> observable1 = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(final Subscriber<? super String> subscriber) {
final Scheduler.Worker worker = Schedulers.io().createWorker();
subscriber.add(worker);
worker.schedulePeriodically(new Action0() {
long counter = 0;
@Override
public void call() {
if (counter == 8) {
subscriber.onError(new Throwable());
}
try {
subscriber.onNext("Observable1 " + counter++ * 10);
} catch (Throwable e) {
try {
worker.unsubscribe();
} finally {
Exceptions.throwOrReport(e, subscriber);
}
}
}
}, 1000, 1000, TimeUnit.MILLISECONDS);
}
});
Observable<String> observable2 = Observable.interval(1, TimeUnit.SECONDS)
.map(new Func1<Long, String>() {
@Override
public String call(Long position) {
return "observable2 " + position * 10;
}
}).take(10);
List<Observable<String>> observableList = new ArrayList<>();
observableList.add(observable1);
observableList.add(observable2);
return Observable.combineLatestDelayError(observableList, new FuncN<Object>() {
@Override
public Object call(Object... args) {
List<String> results = new ArrayList<>();
for (Object object : args) {
results.add((String) object);
}
return results;
}
});
}
MainListWithExample_Observable_combineLatest.java 文件源码
java
阅读 31
收藏 0
点赞 0
评论 0
项目:RxJavaDemo
作者:
评论列表
文章目录