@Override
public Observable<?> call(Observable<? extends Notification<?>> ts) {
return ts.map(new Function<Notification<?>, Notification<?>>() {
int num=0;
@Override
public Notification<?> call(Notification<?> terminalNotification) {
if(count == 0) {
return terminalNotification;
}
num++;
if(num <= count) {
return Notification.createOnNext(num);
} else {
return terminalNotification;
}
}
}).dematerialize();
}
java类rx.functions.Function的实例源码
OnSubscribeRedo.java 文件源码
项目:RxJavaFlow
阅读 25
收藏 0
点赞 0
评论 0
OperatorTakeTest.java 文件源码
项目:RxJavaFlow
阅读 25
收藏 0
点赞 0
评论 0
@Test
public void testTakeWithErrorHappeningInOnNext() {
Observable<Integer> w = Observable.from(Arrays.asList(1, 2, 3)).take(2).map(new Function<Integer, Integer>() {
@Override
public Integer call(Integer t1) {
throw new IllegalArgumentException("some error");
}
});
@SuppressWarnings("unchecked")
Observer<Integer> observer = mock(Observer.class);
w.subscribe(observer);
InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onError(any(IllegalArgumentException.class));
inOrder.verifyNoMoreInteractions();
}
OperatorDoOnEachTest.java 文件源码
项目:RxJavaFlow
阅读 21
收藏 0
点赞 0
评论 0
@Test
public void testIssue1451Case1() {
// https://github.com/Netflix/RxJava/issues/1451
int[] nums = { 1, 2, 3 };
final AtomicInteger count = new AtomicInteger();
for (final int n : nums) {
Observable
.just(Boolean.TRUE, Boolean.FALSE)
.takeWhile(new Function<Boolean, Boolean>() {
@Override
public Boolean call(Boolean value) {
return value;
}
})
.toList()
.doOnNext(new Action1<List<Boolean>>() {
@Override
public void call(List<Boolean> booleans) {
count.incrementAndGet();
}
})
.subscribe();
}
assertEquals(nums.length, count.get());
}
OperatorDebounceTest.java 文件源码
项目:RxJavaFlow
阅读 29
收藏 0
点赞 0
评论 0
@Test
public void debounceSelectorFuncThrows() {
PublishSubject<Integer> source = PublishSubject.create();
Function<Integer, Observable<Integer>> debounceSel = new Function<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer t1) {
throw new TestException();
}
};
@SuppressWarnings("unchecked")
Observer<Object> o = mock(Observer.class);
source.debounce(debounceSel).subscribe(o);
source.onNext(1);
verify(o, never()).onNext(any());
verify(o, never()).onComplete();
verify(o).onError(any(TestException.class));
}
OperatorDoOnEachTest.java 文件源码
项目:RxJavaFlow
阅读 22
收藏 0
点赞 0
评论 0
@Test
public void testIssue1451Case2() {
// https://github.com/Netflix/RxJava/issues/1451
int[] nums = { 1, 2, 3 };
final AtomicInteger count = new AtomicInteger();
for (final int n : nums) {
Observable
.just(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE)
.takeWhile(new Function<Boolean, Boolean>() {
@Override
public Boolean call(Boolean value) {
return value;
}
})
.toList()
.doOnNext(new Action1<List<Boolean>>() {
@Override
public void call(List<Boolean> booleans) {
count.incrementAndGet();
}
})
.subscribe();
}
assertEquals(nums.length, count.get());
}
OperatorAnyTest.java 文件源码
项目:RxJavaFlow
阅读 23
收藏 0
点赞 0
评论 0
@Test
public void testAnyWithPredicate1() {
Observable<Integer> w = Observable.just(1, 2, 3);
Observable<Boolean> observable = w.exists(
new Function<Integer, Boolean>() {
@Override
public Boolean call(Integer t1) {
return t1 < 2;
}
});
@SuppressWarnings("unchecked")
Observer<Boolean> observer = mock(Observer.class);
observable.subscribe(observer);
verify(observer, never()).onNext(false);
verify(observer, times(1)).onNext(true);
verify(observer, never()).onError(org.mockito.Matchers.any(Throwable.class));
verify(observer, times(1)).onComplete();
}
OperatorToMultimapTest.java 文件源码
项目:RxJavaFlow
阅读 22
收藏 0
点赞 0
评论 0
@Test
public void testToMultimapWithErrorInValueSelector() {
Observable<String> source = Observable.just("a", "b", "cc", "dd");
Function<String, String> duplicateErr = new Function<String, String>() {
@Override
public String call(String t1) {
if ("b".equals(t1)) {
throw new RuntimeException("Forced failure");
}
return t1 + t1;
}
};
Observable<Map<Integer, Collection<String>>> mapped = source.toMultimap(lengthFunc, duplicateErr);
Map<Integer, Collection<String>> expected = new HashMap<Integer, Collection<String>>();
expected.put(1, Arrays.asList("aa", "bb"));
expected.put(2, Arrays.asList("cccc", "dddd"));
mapped.subscribe(objectObserver);
verify(objectObserver, times(1)).onError(any(Throwable.class));
verify(objectObserver, never()).onNext(expected);
verify(objectObserver, never()).onComplete();
}
GroupByTests.java 文件源码
项目:RxJavaFlow
阅读 25
收藏 0
点赞 0
评论 0
@Test
public void testTakeUnsubscribesOnGroupBy() {
Observable.merge(
EventStream.getEventStream("HTTP-ClusterA", 50),
EventStream.getEventStream("HTTP-ClusterB", 20))
// group by type (2 clusters)
.groupBy(new Function<Event, String>() {
@Override
public String call(Event event) {
return event.type;
}
}).take(1)
.toBlocking().forEach(new Action1<GroupedObservable<String, Event>>() {
@Override
public void call(GroupedObservable<String, Event> g) {
System.out.println(g);
}
});
System.out.println("**** finished");
}
OnSubscribeUsingTest.java 文件源码
项目:RxJavaFlow
阅读 26
收藏 0
点赞 0
评论 0
private void performTestUsingWithObservableFactoryError(boolean disposeEagerly) {
final Action0 unsubscribe = mock(Action0.class);
Supplier<Subscription> resourceFactory = new Supplier<Subscription>() {
@Override
public Subscription call() {
return Subscriptions.create(unsubscribe);
}
};
Function<Subscription, Observable<Integer>> observableFactory = new Function<Subscription, Observable<Integer>>() {
@Override
public Observable<Integer> call(Subscription subscription) {
throw new TestException();
}
};
try {
Observable.using(resourceFactory, observableFactory, disposeSubscription).toBlocking()
.last();
fail("Should throw a TestException when the observableFactory throws it");
} catch (TestException e) {
// Make sure that unsubscribe is called so that users can close
// the resource if some error happens.
verify(unsubscribe, times(1)).call();
}
}
OperatorAnyTest.java 文件源码
项目:RxJavaFlow
阅读 29
收藏 0
点赞 0
评论 0
@Test
public void testExists1() {
Observable<Integer> w = Observable.just(1, 2, 3);
Observable<Boolean> observable = w.exists(
new Function<Integer, Boolean>() {
@Override
public Boolean call(Integer t1) {
return t1 < 2;
}
});
@SuppressWarnings("unchecked")
Observer<Boolean> observer = mock(Observer.class);
observable.subscribe(observer);
verify(observer, never()).onNext(false);
verify(observer, times(1)).onNext(true);
verify(observer, never()).onError(org.mockito.Matchers.any(Throwable.class));
verify(observer, times(1)).onComplete();
}
OperatorDebounceTest.java 文件源码
项目:RxJavaFlow
阅读 45
收藏 0
点赞 0
评论 0
@Test
public void debounceSelectorObservableThrows() {
PublishSubject<Integer> source = PublishSubject.create();
Function<Integer, Observable<Integer>> debounceSel = new Function<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer t1) {
return Observable.error(new TestException());
}
};
@SuppressWarnings("unchecked")
Observer<Object> o = mock(Observer.class);
source.debounce(debounceSel).subscribe(o);
source.onNext(1);
verify(o, never()).onNext(any());
verify(o, never()).onComplete();
verify(o).onError(any(TestException.class));
}
OperatorAllTest.java 文件源码
项目:RxJavaFlow
阅读 26
收藏 0
点赞 0
评论 0
@Test
@SuppressWarnings("unchecked")
public void testAll() {
Observable<String> obs = Observable.just("one", "two", "six");
Observer<Boolean> observer = mock(Observer.class);
obs.all(new Function<String, Boolean>() {
@Override
public Boolean call(String s) {
return s.length() == 3;
}
}).subscribe(observer);
verify(observer).onNext(true);
verify(observer).onComplete();
verifyNoMoreInteractions(observer);
}
OperatorAllTest.java 文件源码
项目:RxJavaFlow
阅读 23
收藏 0
点赞 0
评论 0
@Test
@SuppressWarnings("unchecked")
public void testNotAll() {
Observable<String> obs = Observable.just("one", "two", "three", "six");
Observer<Boolean> observer = mock(Observer.class);
obs.all(new Function<String, Boolean>() {
@Override
public Boolean call(String s) {
return s.length() == 3;
}
}).subscribe(observer);
verify(observer).onNext(false);
verify(observer).onComplete();
verifyNoMoreInteractions(observer);
}
OperatorAllTest.java 文件源码
项目:RxJavaFlow
阅读 34
收藏 0
点赞 0
评论 0
@Test
@SuppressWarnings("unchecked")
public void testEmpty() {
Observable<String> obs = Observable.empty();
Observer<Boolean> observer = mock(Observer.class);
obs.all(new Function<String, Boolean>() {
@Override
public Boolean call(String s) {
return s.length() == 3;
}
}).subscribe(observer);
verify(observer).onNext(true);
verify(observer).onComplete();
verifyNoMoreInteractions(observer);
}
OperatorDelayTest.java 文件源码
项目:RxJavaFlow
阅读 21
收藏 0
点赞 0
评论 0
@Test
public void testDelayWithObservableEmptyDelayer() {
PublishSubject<Integer> source = PublishSubject.create();
Function<Integer, Observable<Integer>> delayFunc = new Function<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer t1) {
return Observable.empty();
}
};
@SuppressWarnings("unchecked")
Observer<Object> o = mock(Observer.class);
InOrder inOrder = inOrder(o);
source.delay(delayFunc).subscribe(o);
source.onNext(1);
source.onComplete();
inOrder.verify(o).onNext(1);
inOrder.verify(o).onComplete();
inOrder.verifyNoMoreInteractions();
verify(o, never()).onError(any(Throwable.class));
}
OnSubscribeJoinTest.java 文件源码
项目:RxJavaFlow
阅读 24
收藏 0
点赞 0
评论 0
@Test
public void leftDurationSelectorThrows() {
PublishSubject<Integer> source1 = PublishSubject.create();
PublishSubject<Integer> source2 = PublishSubject.create();
Function<Integer, Observable<Integer>> fail = new Function<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer t1) {
throw new RuntimeException("Forced failure");
}
};
Observable<Integer> m = source1.join(source2,
fail,
just(Observable.never()), add);
m.subscribe(observer);
source1.onNext(1);
verify(observer, times(1)).onError(any(Throwable.class));
verify(observer, never()).onComplete();
verify(observer, never()).onNext(any());
}
OperatorAnyTest.java 文件源码
项目:RxJavaFlow
阅读 22
收藏 0
点赞 0
评论 0
@Test
public void testAnyWithEmptyAndPredicate() {
// If the source is empty, always output false.
Observable<Integer> w = Observable.empty();
Observable<Boolean> observable = w.exists(
new Function<Integer, Boolean>() {
@Override
public Boolean call(Integer t1) {
return true;
}
});
@SuppressWarnings("unchecked")
Observer<Boolean> observer = mock(Observer.class);
observable.subscribe(observer);
verify(observer, times(1)).onNext(false);
verify(observer, never()).onNext(true);
verify(observer, never()).onError(org.mockito.Matchers.any(Throwable.class));
verify(observer, times(1)).onComplete();
}
OperatorObserveOnTest.java 文件源码
项目:RxJavaFlow
阅读 26
收藏 0
点赞 0
评论 0
/**
* Make sure we get a MissingBackpressureException propagated through when we have a fast temporal (hot) producer.
*/
@Test
public void testHotOperatorBackpressure() {
TestSubscriber<String> ts = new TestSubscriber<String>();
Observable.timer(0, 1, TimeUnit.MICROSECONDS)
.observeOn(Schedulers.computation())
.map(new Function<Long, String>() {
@Override
public String call(Long t1) {
System.out.println(t1);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
return t1 + " slow value";
}
}).subscribe(ts);
ts.awaitTerminalEvent();
System.out.println("Errors: " + ts.getOnErrorEvents());
assertEquals(1, ts.getOnErrorEvents().size());
assertEquals(MissingBackpressureException.class, ts.getOnErrorEvents().get(0).getClass());
}
OperatorAnyTest.java 文件源码
项目:RxJavaFlow
阅读 24
收藏 0
点赞 0
评论 0
@Test
public void testAnyWithPredicate2() {
Observable<Integer> w = Observable.just(1, 2, 3);
Observable<Boolean> observable = w.exists(
new Function<Integer, Boolean>() {
@Override
public Boolean call(Integer t1) {
return t1 < 1;
}
});
@SuppressWarnings("unchecked")
Observer<Boolean> observer = mock(Observer.class);
observable.subscribe(observer);
verify(observer, times(1)).onNext(false);
verify(observer, never()).onNext(true);
verify(observer, never()).onError(org.mockito.Matchers.any(Throwable.class));
verify(observer, times(1)).onComplete();
}
OnSubscribeUsingTest.java 文件源码
项目:RxJavaFlow
阅读 22
收藏 0
点赞 0
评论 0
@Test
public void testUsingDoesNotDisposesEagerlyBeforeError() {
final List<String> events = new ArrayList<String>();
Supplier<Resource> resourceFactory = createResourceFactory(events);
final Action1<Throwable> onError = createOnErrorAction(events);
final Action0 unsub = createUnsubAction(events);
Function<Resource, Observable<String>> observableFactory = new Function<Resource, Observable<String>>() {
@Override
public Observable<String> call(Resource resource) {
return Observable.from(resource.getTextFromWeb().split(" ")).concatWith(Observable.<String>error(new RuntimeException()));
}
};
@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Observable<String> observable = Observable.using(resourceFactory, observableFactory,
new DisposeAction(), false).doOnUnsubscribe(unsub)
.doOnError(onError);
observable.subscribe(observer);
assertEquals(Arrays.asList("error", "unsub", "disposed"), events);
}
OperatorFirstTest.java 文件源码
项目:RxJavaFlow
阅读 26
收藏 0
点赞 0
评论 0
@Test
public void testFirstOrDefaultWithPredicateAndEmpty() {
Observable<Integer> observable = Observable.just(1).firstOrDefault(2,
new Function<Integer, Boolean>() {
@Override
public Boolean call(Integer t1) {
return t1 % 2 == 0;
}
});
@SuppressWarnings("unchecked")
Observer<Integer> observer = mock(Observer.class);
observable.subscribe(observer);
InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext(2);
inOrder.verify(observer, times(1)).onComplete();
inOrder.verifyNoMoreInteractions();
}
OnSubscribeUsingTest.java 文件源码
项目:RxJavaFlow
阅读 22
收藏 0
点赞 0
评论 0
private void performTestUsingWithResourceFactoryError(boolean disposeEagerly) {
Supplier<Subscription> resourceFactory = new Supplier<Subscription>() {
@Override
public Subscription call() {
throw new TestException();
}
};
Function<Subscription, Observable<Integer>> observableFactory = new Function<Subscription, Observable<Integer>>() {
@Override
public Observable<Integer> call(Subscription subscription) {
return Observable.empty();
}
};
Observable.using(resourceFactory, observableFactory, disposeSubscription).toBlocking()
.last();
}
OperatorDelayTest.java 文件源码
项目:RxJavaFlow
阅读 23
收藏 0
点赞 0
评论 0
@Test
public void testDelayWithObservableDelayFunctionThrows() {
PublishSubject<Integer> source = PublishSubject.create();
Function<Integer, Observable<Integer>> delayFunc = new Function<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer t1) {
throw new TestException();
}
};
@SuppressWarnings("unchecked")
Observer<Object> o = mock(Observer.class);
InOrder inOrder = inOrder(o);
source.delay(delayFunc).subscribe(o);
source.onNext(1);
inOrder.verify(o).onError(any(TestException.class));
inOrder.verifyNoMoreInteractions();
verify(o, never()).onNext(any());
verify(o, never()).onComplete();
}
OperatorTakeWhileTest.java 文件源码
项目:RxJavaFlow
阅读 28
收藏 0
点赞 0
评论 0
@Test
public void testTakeWhile2() {
Observable<String> w = Observable.just("one", "two", "three");
Observable<String> take = w.takeWhile(new Function<String, Boolean>() {
int index = 0;
@Override
public Boolean call(String input) {
return index++ < 2;
}
});
@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
take.subscribe(observer);
verify(observer, times(1)).onNext("one");
verify(observer, times(1)).onNext("two");
verify(observer, never()).onNext("three");
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
OperatorTakeWhileTest.java 文件源码
项目:RxJavaFlow
阅读 26
收藏 0
点赞 0
评论 0
@Test
public void testTakeWhileDoesntLeakErrors() {
Observable<String> source = Observable.create(new OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> observer) {
observer.onNext("one");
observer.onError(new Throwable("test failed"));
}
});
source.takeWhile(new Function<String, Boolean>() {
@Override
public Boolean call(String s) {
return false;
}
}).toBlocking().lastOrDefault("");
}
OperatorTakeWhileTest.java 文件源码
项目:RxJavaFlow
阅读 34
收藏 0
点赞 0
评论 0
@Test
public void testTakeWhileProtectsPredicateCall() {
TestObservable source = new TestObservable(mock(Subscription.class), "one");
final RuntimeException testException = new RuntimeException("test exception");
@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Observable<String> take = Observable.create(source).takeWhile(new Function<String, Boolean>() {
@Override
public Boolean call(String s) {
throw testException;
}
});
take.subscribe(observer);
// wait for the Observable to complete
try {
source.t.join();
} catch (Throwable e) {
e.printStackTrace();
fail(e.getMessage());
}
verify(observer, never()).onNext(any(String.class));
verify(observer, times(1)).onError(testException);
}
OnSubscribeUsingTest.java 文件源码
项目:RxJavaFlow
阅读 23
收藏 0
点赞 0
评论 0
@Test
public void testUsingDisposesEagerlyBeforeError() {
final List<String> events = new ArrayList<String>();
Supplier<Resource> resourceFactory = createResourceFactory(events);
final Action1<Throwable> onError = createOnErrorAction(events);
final Action0 unsub = createUnsubAction(events);
Function<Resource, Observable<String>> observableFactory = new Function<Resource, Observable<String>>() {
@Override
public Observable<String> call(Resource resource) {
return Observable.from(resource.getTextFromWeb().split(" ")).concatWith(Observable.<String>error(new RuntimeException()));
}
};
@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Observable<String> observable = Observable.using(resourceFactory, observableFactory,
new DisposeAction(), true).doOnUnsubscribe(unsub)
.doOnError(onError);
observable.subscribe(observer);
assertEquals(Arrays.asList("disposed", "error", "unsub"), events);
}
OperatorLastTest.java 文件源码
项目:RxJavaFlow
阅读 20
收藏 0
点赞 0
评论 0
@Test
public void testLastWithPredicate() {
Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5, 6)
.last(new Function<Integer, Boolean>() {
@Override
public Boolean call(Integer t1) {
return t1 % 2 == 0;
}
});
@SuppressWarnings("unchecked")
Observer<Integer> observer = mock(Observer.class);
observable.subscribe(observer);
InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext(6);
inOrder.verify(observer, times(1)).onComplete();
inOrder.verifyNoMoreInteractions();
}
OperatorLastTest.java 文件源码
项目:RxJavaFlow
阅读 22
收藏 0
点赞 0
评论 0
@Test
public void testLastWithPredicateAndEmpty() {
Observable<Integer> observable = Observable.just(1).last(
new Function<Integer, Boolean>() {
@Override
public Boolean call(Integer t1) {
return t1 % 2 == 0;
}
});
@SuppressWarnings("unchecked")
Observer<Integer> observer = mock(Observer.class);
observable.subscribe(observer);
InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onError(
isA(NoSuchElementException.class));
inOrder.verifyNoMoreInteractions();
}
OperatorLastTest.java 文件源码
项目:RxJavaFlow
阅读 25
收藏 0
点赞 0
评论 0
@Test
public void testLastOrDefaultWithPredicate() {
Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5, 6)
.lastOrDefault(8, new Function<Integer, Boolean>() {
@Override
public Boolean call(Integer t1) {
return t1 % 2 == 0;
}
});
@SuppressWarnings("unchecked")
Observer<Integer> observer = mock(Observer.class);
observable.subscribe(observer);
InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext(6);
inOrder.verify(observer, times(1)).onComplete();
inOrder.verifyNoMoreInteractions();
}