java类rx.functions.Function的实例源码

OnSubscribeRedo.java 文件源码 项目:RxJavaFlow 阅读 25 收藏 0 点赞 0 评论 0
@Override
public Observable<?> call(Observable<? extends Notification<?>> ts) {
    return ts.map(new Function<Notification<?>, Notification<?>>() {

        int num=0;

        @Override
        public Notification<?> call(Notification<?> terminalNotification) {
            if(count == 0) {
                return terminalNotification;
            }

            num++;
            if(num <= count) {
                return Notification.createOnNext(num);
            } else {
                return terminalNotification;
            }
        }

    }).dematerialize();
}
OperatorTakeTest.java 文件源码 项目:RxJavaFlow 阅读 25 收藏 0 点赞 0 评论 0
@Test
public void testTakeWithErrorHappeningInOnNext() {
    Observable<Integer> w = Observable.from(Arrays.asList(1, 2, 3)).take(2).map(new Function<Integer, Integer>() {
        @Override
        public Integer call(Integer t1) {
            throw new IllegalArgumentException("some error");
        }
    });

    @SuppressWarnings("unchecked")
    Observer<Integer> observer = mock(Observer.class);
    w.subscribe(observer);
    InOrder inOrder = inOrder(observer);
    inOrder.verify(observer, times(1)).onError(any(IllegalArgumentException.class));
    inOrder.verifyNoMoreInteractions();
}
OperatorDoOnEachTest.java 文件源码 项目:RxJavaFlow 阅读 21 收藏 0 点赞 0 评论 0
@Test
public void testIssue1451Case1() {
    // https://github.com/Netflix/RxJava/issues/1451
    int[] nums = { 1, 2, 3 };
    final AtomicInteger count = new AtomicInteger();
    for (final int n : nums) {
        Observable
                .just(Boolean.TRUE, Boolean.FALSE)
                .takeWhile(new Function<Boolean, Boolean>() {
                    @Override
                    public Boolean call(Boolean value) {
                        return value;
                    }
                })
                .toList()
                .doOnNext(new Action1<List<Boolean>>() {
                    @Override
                    public void call(List<Boolean> booleans) {
                        count.incrementAndGet();
                    }
                })
                .subscribe();
    }
    assertEquals(nums.length, count.get());
}
OperatorDebounceTest.java 文件源码 项目:RxJavaFlow 阅读 29 收藏 0 点赞 0 评论 0
@Test
public void debounceSelectorFuncThrows() {
    PublishSubject<Integer> source = PublishSubject.create();
    Function<Integer, Observable<Integer>> debounceSel = new Function<Integer, Observable<Integer>>() {

        @Override
        public Observable<Integer> call(Integer t1) {
            throw new TestException();
        }
    };

    @SuppressWarnings("unchecked")
    Observer<Object> o = mock(Observer.class);

    source.debounce(debounceSel).subscribe(o);

    source.onNext(1);

    verify(o, never()).onNext(any());
    verify(o, never()).onComplete();
    verify(o).onError(any(TestException.class));
}
OperatorDoOnEachTest.java 文件源码 项目:RxJavaFlow 阅读 22 收藏 0 点赞 0 评论 0
@Test
public void testIssue1451Case2() {
    // https://github.com/Netflix/RxJava/issues/1451
    int[] nums = { 1, 2, 3 };
    final AtomicInteger count = new AtomicInteger();
    for (final int n : nums) {
        Observable
                .just(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE)
                .takeWhile(new Function<Boolean, Boolean>() {
                    @Override
                    public Boolean call(Boolean value) {
                        return value;
                    }
                })
                .toList()
                .doOnNext(new Action1<List<Boolean>>() {
                    @Override
                    public void call(List<Boolean> booleans) {
                        count.incrementAndGet();
                    }
                })
                .subscribe();
    }
    assertEquals(nums.length, count.get());
}
OperatorAnyTest.java 文件源码 项目:RxJavaFlow 阅读 23 收藏 0 点赞 0 评论 0
@Test
public void testAnyWithPredicate1() {
    Observable<Integer> w = Observable.just(1, 2, 3);
    Observable<Boolean> observable = w.exists(
            new Function<Integer, Boolean>() {

                @Override
                public Boolean call(Integer t1) {
                    return t1 < 2;
                }
            });

    @SuppressWarnings("unchecked")
    Observer<Boolean> observer = mock(Observer.class);
    observable.subscribe(observer);
    verify(observer, never()).onNext(false);
    verify(observer, times(1)).onNext(true);
    verify(observer, never()).onError(org.mockito.Matchers.any(Throwable.class));
    verify(observer, times(1)).onComplete();
}
OperatorToMultimapTest.java 文件源码 项目:RxJavaFlow 阅读 22 收藏 0 点赞 0 评论 0
@Test
public void testToMultimapWithErrorInValueSelector() {
    Observable<String> source = Observable.just("a", "b", "cc", "dd");

    Function<String, String> duplicateErr = new Function<String, String>() {
        @Override
        public String call(String t1) {
            if ("b".equals(t1)) {
                throw new RuntimeException("Forced failure");
            }
            return t1 + t1;
        }
    };

    Observable<Map<Integer, Collection<String>>> mapped = source.toMultimap(lengthFunc, duplicateErr);

    Map<Integer, Collection<String>> expected = new HashMap<Integer, Collection<String>>();
    expected.put(1, Arrays.asList("aa", "bb"));
    expected.put(2, Arrays.asList("cccc", "dddd"));

    mapped.subscribe(objectObserver);

    verify(objectObserver, times(1)).onError(any(Throwable.class));
    verify(objectObserver, never()).onNext(expected);
    verify(objectObserver, never()).onComplete();
}
GroupByTests.java 文件源码 项目:RxJavaFlow 阅读 25 收藏 0 点赞 0 评论 0
@Test
public void testTakeUnsubscribesOnGroupBy() {
    Observable.merge(
            EventStream.getEventStream("HTTP-ClusterA", 50),
            EventStream.getEventStream("HTTP-ClusterB", 20))
            // group by type (2 clusters)
            .groupBy(new Function<Event, String>() {

                @Override
                public String call(Event event) {
                    return event.type;
                }

            }).take(1)
            .toBlocking().forEach(new Action1<GroupedObservable<String, Event>>() {

                @Override
                public void call(GroupedObservable<String, Event> g) {
                    System.out.println(g);
                }

            });

    System.out.println("**** finished");
}
OnSubscribeUsingTest.java 文件源码 项目:RxJavaFlow 阅读 26 收藏 0 点赞 0 评论 0
private void performTestUsingWithObservableFactoryError(boolean disposeEagerly) {
    final Action0 unsubscribe = mock(Action0.class);
    Supplier<Subscription> resourceFactory = new Supplier<Subscription>() {
        @Override
        public Subscription call() {
            return Subscriptions.create(unsubscribe);
        }
    };

    Function<Subscription, Observable<Integer>> observableFactory = new Function<Subscription, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Subscription subscription) {
            throw new TestException();
        }
    };

    try {
        Observable.using(resourceFactory, observableFactory, disposeSubscription).toBlocking()
                .last();
        fail("Should throw a TestException when the observableFactory throws it");
    } catch (TestException e) {
        // Make sure that unsubscribe is called so that users can close
        // the resource if some error happens.
        verify(unsubscribe, times(1)).call();
    }
}
OperatorAnyTest.java 文件源码 项目:RxJavaFlow 阅读 29 收藏 0 点赞 0 评论 0
@Test
public void testExists1() {
    Observable<Integer> w = Observable.just(1, 2, 3);
    Observable<Boolean> observable = w.exists(
            new Function<Integer, Boolean>() {

                @Override
                public Boolean call(Integer t1) {
                    return t1 < 2;
                }
            });

    @SuppressWarnings("unchecked")
    Observer<Boolean> observer = mock(Observer.class);
    observable.subscribe(observer);
    verify(observer, never()).onNext(false);
    verify(observer, times(1)).onNext(true);
    verify(observer, never()).onError(org.mockito.Matchers.any(Throwable.class));
    verify(observer, times(1)).onComplete();
}
OperatorDebounceTest.java 文件源码 项目:RxJavaFlow 阅读 45 收藏 0 点赞 0 评论 0
@Test
public void debounceSelectorObservableThrows() {
    PublishSubject<Integer> source = PublishSubject.create();
    Function<Integer, Observable<Integer>> debounceSel = new Function<Integer, Observable<Integer>>() {

        @Override
        public Observable<Integer> call(Integer t1) {
            return Observable.error(new TestException());
        }
    };

    @SuppressWarnings("unchecked")
    Observer<Object> o = mock(Observer.class);

    source.debounce(debounceSel).subscribe(o);

    source.onNext(1);

    verify(o, never()).onNext(any());
    verify(o, never()).onComplete();
    verify(o).onError(any(TestException.class));
}
OperatorAllTest.java 文件源码 项目:RxJavaFlow 阅读 26 收藏 0 点赞 0 评论 0
@Test
@SuppressWarnings("unchecked")
public void testAll() {
    Observable<String> obs = Observable.just("one", "two", "six");

    Observer<Boolean> observer = mock(Observer.class);
    obs.all(new Function<String, Boolean>() {
        @Override
        public Boolean call(String s) {
            return s.length() == 3;
        }
    }).subscribe(observer);

    verify(observer).onNext(true);
    verify(observer).onComplete();
    verifyNoMoreInteractions(observer);
}
OperatorAllTest.java 文件源码 项目:RxJavaFlow 阅读 23 收藏 0 点赞 0 评论 0
@Test
@SuppressWarnings("unchecked")
public void testNotAll() {
    Observable<String> obs = Observable.just("one", "two", "three", "six");

    Observer<Boolean> observer = mock(Observer.class);
    obs.all(new Function<String, Boolean>() {
        @Override
        public Boolean call(String s) {
            return s.length() == 3;
        }
    }).subscribe(observer);

    verify(observer).onNext(false);
    verify(observer).onComplete();
    verifyNoMoreInteractions(observer);
}
OperatorAllTest.java 文件源码 项目:RxJavaFlow 阅读 34 收藏 0 点赞 0 评论 0
@Test
@SuppressWarnings("unchecked")
public void testEmpty() {
    Observable<String> obs = Observable.empty();

    Observer<Boolean> observer = mock(Observer.class);
    obs.all(new Function<String, Boolean>() {
        @Override
        public Boolean call(String s) {
            return s.length() == 3;
        }
    }).subscribe(observer);

    verify(observer).onNext(true);
    verify(observer).onComplete();
    verifyNoMoreInteractions(observer);
}
OperatorDelayTest.java 文件源码 项目:RxJavaFlow 阅读 21 收藏 0 点赞 0 评论 0
@Test
public void testDelayWithObservableEmptyDelayer() {
    PublishSubject<Integer> source = PublishSubject.create();

    Function<Integer, Observable<Integer>> delayFunc = new Function<Integer, Observable<Integer>>() {

        @Override
        public Observable<Integer> call(Integer t1) {
            return Observable.empty();
        }
    };
    @SuppressWarnings("unchecked")
    Observer<Object> o = mock(Observer.class);
    InOrder inOrder = inOrder(o);

    source.delay(delayFunc).subscribe(o);

    source.onNext(1);
    source.onComplete();

    inOrder.verify(o).onNext(1);
    inOrder.verify(o).onComplete();
    inOrder.verifyNoMoreInteractions();
    verify(o, never()).onError(any(Throwable.class));
}
OnSubscribeJoinTest.java 文件源码 项目:RxJavaFlow 阅读 24 收藏 0 点赞 0 评论 0
@Test
public void leftDurationSelectorThrows() {
    PublishSubject<Integer> source1 = PublishSubject.create();
    PublishSubject<Integer> source2 = PublishSubject.create();

    Function<Integer, Observable<Integer>> fail = new Function<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer t1) {
            throw new RuntimeException("Forced failure");
        }
    };

    Observable<Integer> m = source1.join(source2,
            fail,
            just(Observable.never()), add);
    m.subscribe(observer);

    source1.onNext(1);

    verify(observer, times(1)).onError(any(Throwable.class));
    verify(observer, never()).onComplete();
    verify(observer, never()).onNext(any());
}
OperatorAnyTest.java 文件源码 项目:RxJavaFlow 阅读 22 收藏 0 点赞 0 评论 0
@Test
public void testAnyWithEmptyAndPredicate() {
    // If the source is empty, always output false.
    Observable<Integer> w = Observable.empty();
    Observable<Boolean> observable = w.exists(
            new Function<Integer, Boolean>() {

                @Override
                public Boolean call(Integer t1) {
                    return true;
                }
            });

    @SuppressWarnings("unchecked")
    Observer<Boolean> observer = mock(Observer.class);
    observable.subscribe(observer);
    verify(observer, times(1)).onNext(false);
    verify(observer, never()).onNext(true);
    verify(observer, never()).onError(org.mockito.Matchers.any(Throwable.class));
    verify(observer, times(1)).onComplete();
}
OperatorObserveOnTest.java 文件源码 项目:RxJavaFlow 阅读 26 收藏 0 点赞 0 评论 0
/**
 * Make sure we get a MissingBackpressureException propagated through when we have a fast temporal (hot) producer.
 */
@Test
public void testHotOperatorBackpressure() {
    TestSubscriber<String> ts = new TestSubscriber<String>();
    Observable.timer(0, 1, TimeUnit.MICROSECONDS)
            .observeOn(Schedulers.computation())
            .map(new Function<Long, String>() {

                @Override
                public String call(Long t1) {
                    System.out.println(t1);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                    }
                    return t1 + " slow value";
                }

            }).subscribe(ts);

    ts.awaitTerminalEvent();
    System.out.println("Errors: " + ts.getOnErrorEvents());
    assertEquals(1, ts.getOnErrorEvents().size());
    assertEquals(MissingBackpressureException.class, ts.getOnErrorEvents().get(0).getClass());
}
OperatorAnyTest.java 文件源码 项目:RxJavaFlow 阅读 24 收藏 0 点赞 0 评论 0
@Test
public void testAnyWithPredicate2() {
    Observable<Integer> w = Observable.just(1, 2, 3);
    Observable<Boolean> observable = w.exists(
            new Function<Integer, Boolean>() {

                @Override
                public Boolean call(Integer t1) {
                    return t1 < 1;
                }
            });

    @SuppressWarnings("unchecked")
    Observer<Boolean> observer = mock(Observer.class);
    observable.subscribe(observer);
    verify(observer, times(1)).onNext(false);
    verify(observer, never()).onNext(true);
    verify(observer, never()).onError(org.mockito.Matchers.any(Throwable.class));
    verify(observer, times(1)).onComplete();
}
OnSubscribeUsingTest.java 文件源码 项目:RxJavaFlow 阅读 22 收藏 0 点赞 0 评论 0
@Test
public void testUsingDoesNotDisposesEagerlyBeforeError() {
    final List<String> events = new ArrayList<String>();
    Supplier<Resource> resourceFactory = createResourceFactory(events);
    final Action1<Throwable> onError = createOnErrorAction(events);
    final Action0 unsub = createUnsubAction(events);

    Function<Resource, Observable<String>> observableFactory = new Function<Resource, Observable<String>>() {
        @Override
        public Observable<String> call(Resource resource) {
            return Observable.from(resource.getTextFromWeb().split(" ")).concatWith(Observable.<String>error(new RuntimeException()));
        }
    };

    @SuppressWarnings("unchecked")
    Observer<String> observer = mock(Observer.class);
    Observable<String> observable = Observable.using(resourceFactory, observableFactory,
            new DisposeAction(), false).doOnUnsubscribe(unsub)
            .doOnError(onError);
    observable.subscribe(observer);

    assertEquals(Arrays.asList("error", "unsub", "disposed"), events);
}
OperatorFirstTest.java 文件源码 项目:RxJavaFlow 阅读 26 收藏 0 点赞 0 评论 0
@Test
public void testFirstOrDefaultWithPredicateAndEmpty() {
    Observable<Integer> observable = Observable.just(1).firstOrDefault(2,
            new Function<Integer, Boolean>() {

                @Override
                public Boolean call(Integer t1) {
                    return t1 % 2 == 0;
                }
            });

    @SuppressWarnings("unchecked")
    Observer<Integer> observer = mock(Observer.class);
    observable.subscribe(observer);

    InOrder inOrder = inOrder(observer);
    inOrder.verify(observer, times(1)).onNext(2);
    inOrder.verify(observer, times(1)).onComplete();
    inOrder.verifyNoMoreInteractions();
}
OnSubscribeUsingTest.java 文件源码 项目:RxJavaFlow 阅读 22 收藏 0 点赞 0 评论 0
private void performTestUsingWithResourceFactoryError(boolean disposeEagerly) {
    Supplier<Subscription> resourceFactory = new Supplier<Subscription>() {
        @Override
        public Subscription call() {
            throw new TestException();
        }
    };

    Function<Subscription, Observable<Integer>> observableFactory = new Function<Subscription, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Subscription subscription) {
            return Observable.empty();
        }
    };

    Observable.using(resourceFactory, observableFactory, disposeSubscription).toBlocking()
            .last();
}
OperatorDelayTest.java 文件源码 项目:RxJavaFlow 阅读 23 收藏 0 点赞 0 评论 0
@Test
public void testDelayWithObservableDelayFunctionThrows() {
    PublishSubject<Integer> source = PublishSubject.create();

    Function<Integer, Observable<Integer>> delayFunc = new Function<Integer, Observable<Integer>>() {

        @Override
        public Observable<Integer> call(Integer t1) {
            throw new TestException();
        }
    };
    @SuppressWarnings("unchecked")
    Observer<Object> o = mock(Observer.class);
    InOrder inOrder = inOrder(o);

    source.delay(delayFunc).subscribe(o);
    source.onNext(1);

    inOrder.verify(o).onError(any(TestException.class));
    inOrder.verifyNoMoreInteractions();
    verify(o, never()).onNext(any());
    verify(o, never()).onComplete();
}
OperatorTakeWhileTest.java 文件源码 项目:RxJavaFlow 阅读 28 收藏 0 点赞 0 评论 0
@Test
public void testTakeWhile2() {
    Observable<String> w = Observable.just("one", "two", "three");
    Observable<String> take = w.takeWhile(new Function<String, Boolean>() {
        int index = 0;

        @Override
        public Boolean call(String input) {
            return index++ < 2;
        }
    });

    @SuppressWarnings("unchecked")
    Observer<String> observer = mock(Observer.class);
    take.subscribe(observer);
    verify(observer, times(1)).onNext("one");
    verify(observer, times(1)).onNext("two");
    verify(observer, never()).onNext("three");
    verify(observer, never()).onError(any(Throwable.class));
    verify(observer, times(1)).onComplete();
}
OperatorTakeWhileTest.java 文件源码 项目:RxJavaFlow 阅读 26 收藏 0 点赞 0 评论 0
@Test
public void testTakeWhileDoesntLeakErrors() {
    Observable<String> source = Observable.create(new OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> observer) {
            observer.onNext("one");
            observer.onError(new Throwable("test failed"));
        }
    });

    source.takeWhile(new Function<String, Boolean>() {
        @Override
        public Boolean call(String s) {
            return false;
        }
    }).toBlocking().lastOrDefault("");
}
OperatorTakeWhileTest.java 文件源码 项目:RxJavaFlow 阅读 34 收藏 0 点赞 0 评论 0
@Test
public void testTakeWhileProtectsPredicateCall() {
    TestObservable source = new TestObservable(mock(Subscription.class), "one");
    final RuntimeException testException = new RuntimeException("test exception");

    @SuppressWarnings("unchecked")
    Observer<String> observer = mock(Observer.class);
    Observable<String> take = Observable.create(source).takeWhile(new Function<String, Boolean>() {
        @Override
        public Boolean call(String s) {
            throw testException;
        }
    });
    take.subscribe(observer);

    // wait for the Observable to complete
    try {
        source.t.join();
    } catch (Throwable e) {
        e.printStackTrace();
        fail(e.getMessage());
    }

    verify(observer, never()).onNext(any(String.class));
    verify(observer, times(1)).onError(testException);
}
OnSubscribeUsingTest.java 文件源码 项目:RxJavaFlow 阅读 23 收藏 0 点赞 0 评论 0
@Test
public void testUsingDisposesEagerlyBeforeError() {
    final List<String> events = new ArrayList<String>();
    Supplier<Resource> resourceFactory = createResourceFactory(events);
    final Action1<Throwable> onError = createOnErrorAction(events);
    final Action0 unsub = createUnsubAction(events);

    Function<Resource, Observable<String>> observableFactory = new Function<Resource, Observable<String>>() {
        @Override
        public Observable<String> call(Resource resource) {
            return Observable.from(resource.getTextFromWeb().split(" ")).concatWith(Observable.<String>error(new RuntimeException()));
        }
    };

    @SuppressWarnings("unchecked")
    Observer<String> observer = mock(Observer.class);
    Observable<String> observable = Observable.using(resourceFactory, observableFactory,
            new DisposeAction(), true).doOnUnsubscribe(unsub)
            .doOnError(onError);
    observable.subscribe(observer);

    assertEquals(Arrays.asList("disposed", "error", "unsub"), events);

}
OperatorLastTest.java 文件源码 项目:RxJavaFlow 阅读 20 收藏 0 点赞 0 评论 0
@Test
public void testLastWithPredicate() {
    Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5, 6)
            .last(new Function<Integer, Boolean>() {

                @Override
                public Boolean call(Integer t1) {
                    return t1 % 2 == 0;
                }
            });

    @SuppressWarnings("unchecked")
    Observer<Integer> observer = mock(Observer.class);
    observable.subscribe(observer);

    InOrder inOrder = inOrder(observer);
    inOrder.verify(observer, times(1)).onNext(6);
    inOrder.verify(observer, times(1)).onComplete();
    inOrder.verifyNoMoreInteractions();
}
OperatorLastTest.java 文件源码 项目:RxJavaFlow 阅读 22 收藏 0 点赞 0 评论 0
@Test
public void testLastWithPredicateAndEmpty() {
    Observable<Integer> observable = Observable.just(1).last(
            new Function<Integer, Boolean>() {

                @Override
                public Boolean call(Integer t1) {
                    return t1 % 2 == 0;
                }
            });
    @SuppressWarnings("unchecked")
    Observer<Integer> observer = mock(Observer.class);
    observable.subscribe(observer);

    InOrder inOrder = inOrder(observer);
    inOrder.verify(observer, times(1)).onError(
            isA(NoSuchElementException.class));
    inOrder.verifyNoMoreInteractions();
}
OperatorLastTest.java 文件源码 项目:RxJavaFlow 阅读 25 收藏 0 点赞 0 评论 0
@Test
public void testLastOrDefaultWithPredicate() {
    Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5, 6)
            .lastOrDefault(8, new Function<Integer, Boolean>() {

                @Override
                public Boolean call(Integer t1) {
                    return t1 % 2 == 0;
                }
            });

    @SuppressWarnings("unchecked")
    Observer<Integer> observer = mock(Observer.class);
    observable.subscribe(observer);

    InOrder inOrder = inOrder(observer);
    inOrder.verify(observer, times(1)).onNext(6);
    inOrder.verify(observer, times(1)).onComplete();
    inOrder.verifyNoMoreInteractions();
}


问题


面经


文章

微信
公众号

扫码关注公众号