/**
* The error from the user provided Observer is not handled by the subscribe method try/catch.
*
* It is handled by the AtomicObserver that wraps the provided Observer.
*
* Result: Passes (if AtomicObserver functionality exists)
*/
@Test
public void testCustomObservableWithErrorInObserverAsynchronous() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger count = new AtomicInteger();
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(final Observer<? super String> observer) {
final BooleanSubscription s = new BooleanSubscription();
new Thread(new Runnable() {
@Override
public void run() {
try {
if (!s.isUnsubscribed()) {
observer.onNext("1");
observer.onNext("2");
observer.onNext("three");
observer.onNext("4");
observer.onCompleted();
}
} finally {
latch.countDown();
}
}
}).start();
return s;
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("completed");
}
@Override
public void onError(Throwable e) {
error.set(e);
System.out.println("error");
e.printStackTrace();
}
@Override
public void onNext(String v) {
int num = Integer.parseInt(v);
System.out.println(num);
// doSomething(num);
count.incrementAndGet();
}
});
// wait for async sequence to complete
latch.await();
assertEquals(2, count.get());
assertNotNull(error.get());
if (!(error.get() instanceof NumberFormatException)) {
fail("It should be a NumberFormatException");
}
}
ObservableTests.java 文件源码
java
阅读 26
收藏 0
点赞 0
评论 0
项目:lakeside-java
作者:
评论列表
文章目录