private void setMockMode(boolean toggle) {
if (toggle) {
mockLocationSubscription =
Observable.zip(locationProvider.mockLocation(mockLocationObservable),
mockLocationObservable, new Func2<Status, Location, String>() {
int count = 0;
@Override
public String call(Status result, Location location) {
return new LocationToStringFunc().call(location) + " " + count++;
}
})
.subscribe(new DisplayTextOnViewAction(mockLocationView), new ErrorHandler());
} else {
mockLocationSubscription.unsubscribe();
}
}
java类rx.functions.Func2的实例源码
MockLocationsActivity.java 文件源码
项目:GitHub
阅读 59
收藏 0
点赞 0
评论 0
RetryWhenNetworkException.java 文件源码
项目:Bailan
阅读 30
收藏 0
点赞 0
评论 0
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable
.zipWith(Observable.range(1, count + 1), new Func2<Throwable, Integer, Wrapper>() {
@Override
public Wrapper call(Throwable throwable, Integer integer) {
//压缩规则 合并后的结果是一个Observable<Wrapper>
return new Wrapper(throwable, integer);
}
}).flatMap(new Func1<Wrapper, Observable<?>>() {
@Override
public Observable<?> call(Wrapper wrapper) {
//转换规则
if ((wrapper.throwable instanceof ConnectException
|| wrapper.throwable instanceof SocketTimeoutException
|| wrapper.throwable instanceof TimeoutException)
&& wrapper.index < count + 1) { //如果超出重试次数也抛出错误,否则默认是会进入onCompleted
return Observable.timer(delay + (wrapper.index - 1) * increaseDelay, TimeUnit.MILLISECONDS);
}
return Observable.error(wrapper.throwable);
}
});
}
RetryWhenNetworkException.java 文件源码
项目:RxRetrofit-tokean
阅读 27
收藏 0
点赞 0
评论 0
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable
.zipWith(Observable.range(1, count + 1), new Func2<Throwable, Integer, Wrapper>() {
@Override
public Wrapper call(Throwable throwable, Integer integer) {
return new Wrapper(throwable, integer);
}
}).flatMap(new Func1<Wrapper, Observable<?>>() {
@Override
public Observable<?> call(Wrapper wrapper) {
if ((wrapper.throwable instanceof ConnectException
|| wrapper.throwable instanceof SocketTimeoutException
|| wrapper.throwable instanceof TimeoutException)
&& wrapper.index < count + 1) { //如果超出重试次数也抛出错误,否则默认是会进入onCompleted
return Observable.timer(delay + (wrapper.index - 1) * increaseDelay, TimeUnit.MILLISECONDS);
}
return Observable.error(wrapper.throwable);
}
});
}
RetryWhenNetWorkException.java 文件源码
项目:TestChat
阅读 24
收藏 0
点赞 0
评论 0
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable.zipWith(Observable.range(1, retryCount + 1), new Func2<Throwable, Integer, ExceptionWrapper>() {
@Override
public ExceptionWrapper call(Throwable throwable, Integer integer) {
return new ExceptionWrapper(integer, throwable);
}
}).flatMap(new Func1<ExceptionWrapper, Observable<?>>() {
@Override
public Observable<?> call(ExceptionWrapper exceptionWrapper) {
if ((exceptionWrapper.throwable instanceof ConnectException ||
exceptionWrapper.throwable instanceof SocketException ||
exceptionWrapper.throwable instanceof TimeoutException) && exceptionWrapper.index < retryCount + 1) {
return Observable.timer(delayTime + (exceptionWrapper.index - 1) * delayTime, java.util.concurrent.TimeUnit.MILLISECONDS);
}
return Observable.error(exceptionWrapper.throwable);
}
});
}
WebAppBaseImpl.java 文件源码
项目:azure-libraries-for-java
阅读 25
收藏 0
点赞 0
评论 0
@Override
public Observable<Map<String, AppSetting>> getAppSettingsAsync() {
return Observable.zip(listAppSettings(), listSlotConfigurations(), new Func2<StringDictionaryInner, SlotConfigNamesResourceInner, Map<String, AppSetting>>() {
@Override
public Map<String, AppSetting> call(final StringDictionaryInner appSettingsInner, final SlotConfigNamesResourceInner slotConfigs) {
if (appSettingsInner == null || appSettingsInner.properties() == null) {
return null;
}
return Maps.asMap(appSettingsInner.properties().keySet(), new Function<String, AppSetting>() {
@Override
public AppSetting apply(String input) {
return new AppSettingImpl(input, appSettingsInner.properties().get(input),
slotConfigs != null && slotConfigs.appSettingNames() != null && slotConfigs.appSettingNames().contains(input));
}
});
}
});
}
WebAppBaseImpl.java 文件源码
项目:azure-libraries-for-java
阅读 23
收藏 0
点赞 0
评论 0
@Override
public Observable<Map<String, ConnectionString>> getConnectionStringsAsync() {
return Observable.zip(listConnectionStrings(), listSlotConfigurations(), new Func2<ConnectionStringDictionaryInner, SlotConfigNamesResourceInner, Map<String, ConnectionString>>() {
@Override
public Map<String, ConnectionString> call(final ConnectionStringDictionaryInner connectionStringsInner, final SlotConfigNamesResourceInner slotConfigs) {
if (connectionStringsInner == null || connectionStringsInner.properties() == null) {
return null;
}
return Maps.asMap(connectionStringsInner.properties().keySet(), new Function<String, ConnectionString>() {
@Override
public ConnectionString apply(String input) {
return new ConnectionStringImpl(input, connectionStringsInner.properties().get(input),
slotConfigs != null && slotConfigs.connectionStringNames() != null && slotConfigs.connectionStringNames().contains(input));
}
});
}
});
}
AppService.java 文件源码
项目:disclosure-android-app
阅读 27
收藏 0
点赞 0
评论 0
public Func2<AppReport, AppReport, Integer> getSortingFunction(SortBy sortBy) {
switch (sortBy) {
case NAME:
return new SortByName();
case LIBRARY_COUNT:
return new SortByLibraryCount();
case ANALYZED_AT:
return new SortByAnalyzedAt();
case PERMISSION_COUNT:
return new SortByPermissionCount();
default:
throw new IllegalArgumentException("no sorting function for %s " + sortBy);
}
}
RxIdlingResource.java 文件源码
项目:GongXianSheng
阅读 21
收藏 0
点赞 0
评论 0
private void setupHooks() {
RxJavaHooks.setOnObservableStart(new Func2<Observable, OnSubscribe, OnSubscribe>() {
@Override
public OnSubscribe call(Observable observable, OnSubscribe onSubscribe) {
incrementActiveSubscriptionsCount();
return onSubscribe;
}
});
RxJavaHooks.setOnObservableSubscribeError(new Func1<Throwable, Throwable>() {
@Override
public Throwable call(Throwable throwable) {
decrementActiveSubscriptionsCount();
return throwable;
}
});
RxJavaHooks.setOnObservableReturn(new Func1<Subscription, Subscription>() {
@Override
public Subscription call(Subscription subscription) {
decrementActiveSubscriptionsCount();
return subscription;
}
});
}
OperatorSequenceEqual.java 文件源码
项目:boohee_v5.6
阅读 22
收藏 0
点赞 0
评论 0
public static <T> Observable<Boolean> sequenceEqual(Observable<? extends T> first, Observable<? extends T> second, final Func2<? super T, ? super T, Boolean> equality) {
return Observable.zip(materializeLite(first), materializeLite(second), new Func2<Object, Object, Boolean>() {
public Boolean call(Object t1, Object t2) {
boolean c1;
if (t1 == OperatorSequenceEqual.LOCAL_ONCOMPLETED) {
c1 = true;
} else {
c1 = false;
}
boolean c2;
if (t2 == OperatorSequenceEqual.LOCAL_ONCOMPLETED) {
c2 = true;
} else {
c2 = false;
}
if (c1 && c2) {
return Boolean.valueOf(true);
}
if (c1 || c2) {
return Boolean.valueOf(false);
}
return (Boolean) equality.call(t1, t2);
}
}).all(UtilityFunctions.identity());
}
PostItemsListFragment.java 文件源码
项目:mimi-reader
阅读 22
收藏 0
点赞 0
评论 0
private Func2<ChanCatalog, List<HiddenThread>, ChanCatalog> hideThreads() {
return new Func2<ChanCatalog, List<HiddenThread>, ChanCatalog>() {
@Override
public ChanCatalog call(ChanCatalog chanCatalog, List<HiddenThread> hiddenThreads) {
if (chanCatalog != null) {
List<ChanPost> posts = new ArrayList<>();
for (ChanPost post : chanCatalog.getPosts()) {
boolean found = false;
for (HiddenThread hiddenThread : hiddenThreads) {
if (hiddenThread.threadId == post.getNo()) {
found = true;
}
}
if (!found) {
posts.add(post);
}
}
chanCatalog.setPosts(posts);
}
return chanCatalog;
}
};
}
RxJavaCallAdapterFactory.java 文件源码
项目:HttpService
阅读 23
收藏 0
点赞 0
评论 0
@Override public Observable<Long> call(Observable<? extends Throwable> errorObservable) {
return errorObservable.zipWith(Observable.range(INITIAL, maxConnectCount),
new Func2<Throwable, Integer, InnerThrowable>() {
@Override public InnerThrowable call(Throwable throwable, Integer i) {
if (throwable instanceof IOException) return new InnerThrowable(throwable, i);
return new InnerThrowable(throwable, i);
}
}).concatMap(new Func1<InnerThrowable, Observable<Long>>() {
@Override public Observable<Long> call(InnerThrowable innerThrowable) {
Integer currentCount = innerThrowable.getCurrentRetryCount();
if (RetryWhenFunc.this.maxConnectCount.equals(currentCount)) {
return Observable.error(innerThrowable.getThrowable());
}
/*use Schedulers#immediate() to keep on same thread */
return Observable.timer((long) Math.pow(2, currentCount), TimeUnit.SECONDS,
Schedulers.immediate());
}
});
}
CommentPresenter.java 文件源码
项目:zhihudailysoap
阅读 20
收藏 0
点赞 0
评论 0
@Override
public void fetchDataByNetWork(final int newsID) {
Observable<Comments> short_comments_service = RetrofitSingleton.getApiService((Activity) mCommentView).shortComment(newsID);
Observable<Comments> long_comments_service = RetrofitSingleton.getApiService((Activity) mCommentView).longComment(newsID);
Observable.zip(short_comments_service, long_comments_service, new Func2<Comments, Comments, Comments1>(){
@Override
public Comments1 call(Comments comments, Comments comments2) {
Comments1 comments1 = new Comments1();
comments1.long_comments = comments.comments;
comments1.short_comments = comments2.comments;
return comments1;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
}
GankPresenter.java 文件源码
项目:yApp
阅读 19
收藏 0
点赞 0
评论 0
@Override
public Observable<List<Gank>> getObservable() {
if (resId == -1) return null;
return getDataSupports().getGankData(type, mCurrentPage, PAGE_SIZE)
.map(new Func1<GankData, List<Gank>>() {
@Override
public List<Gank> call(GankData gankData) {
return gankData.getResults();
}
})
.flatMap(new Func1<List<Gank>, Observable<Gank>>() {
@Override
public Observable<Gank> call(List<Gank> ganks) {
return Observable.from(ganks);
}
})
.toSortedList(new Func2<Gank, Gank, Integer>() {
@Override
public Integer call(Gank gank, Gank gank2) {
return gank2.getPublishedAt().compareTo(gank.getPublishedAt());
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
DataManager.java 文件源码
项目:Mondroid
阅读 24
收藏 0
点赞 0
评论 0
public Single<AccountData> getAccountDetails() {
return getAccounts()
.flatMap(new Func1<AccountsResponse, Single<? extends AccountData>>() {
@Override
public Single<? extends AccountData> call(AccountsResponse accountsResponse) {
if (!accountsResponse.accounts.isEmpty()) {
String accountId = accountsResponse.accounts.get(0).id;
return Single.zip(getBalance(accountId), getTransactions(accountId),
new Func2<Balance, List<Transaction>, AccountData>() {
@Override
public AccountData call(Balance balance,
List<Transaction> transactions) {
return new AccountData(balance, transactions);
}
});
}
return Single.just(new AccountData());
}
});
}
ApiPaginator.java 文件源码
项目:android-oss
阅读 21
收藏 0
点赞 0
评论 0
private ApiPaginator(
final @NonNull Observable<Void> nextPage,
final @NonNull Observable<Params> startOverWith,
final @NonNull Func1<Envelope, List<Data>> envelopeToListOfData,
final @NonNull Func1<Params, Observable<Envelope>> loadWithParams,
final @NonNull Func1<String, Observable<Envelope>> loadWithPaginationPath,
final @NonNull Func1<Envelope, String> envelopeToMoreUrl,
final @NonNull Func1<List<Data>, List<Data>> pageTransformation,
final boolean clearWhenStartingOver,
final @NonNull Func2<List<Data>, List<Data>, List<Data>> concater,
final boolean distinctUntilChanged
) {
this.nextPage = nextPage;
this.startOverWith = startOverWith;
this.envelopeToListOfData = envelopeToListOfData;
this.loadWithParams = loadWithParams;
this.envelopeToMoreUrl = envelopeToMoreUrl;
this.pageTransformation = pageTransformation;
this.loadWithPaginationPath = loadWithPaginationPath;
this.clearWhenStartingOver = clearWhenStartingOver;
this.concater = concater;
this.distinctUntilChanged = distinctUntilChanged;
this.paginatedData = this.startOverWith.switchMap(this::dataWithPagination);
this.loadingPage = this.startOverWith.switchMap(__ -> nextPage.scan(1, (accum, ___) -> accum + 1));
}
GoogleSearchController.java 文件源码
项目:FloatingSearchView
阅读 21
收藏 0
点赞 0
评论 0
private Observable<SearchResult[]> getQueryObservable(String query) {
return mSearch.search(query)
.flatMap(new Func1<Response, Observable<SearchResult[]>>() {
@Override
public Observable<SearchResult[]> call(Response response) {
if (response.responseData == null)
return Observable.error(new SearchException(response.responseDetails));
return Observable.just(response.responseData.results);
}
})
.retry(new Func2<Integer, Throwable, Boolean>() {
@Override
public Boolean call(Integer integer, Throwable throwable) {
return throwable instanceof InterruptedIOException;
}
});
}
RetryWhenNetworkException.java 文件源码
项目:RxjavaRetrofitDemo-string-master
阅读 23
收藏 0
点赞 0
评论 0
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable
.zipWith(Observable.range(1, count + 1), new Func2<Throwable, Integer, Wrapper>() {
@Override
public Wrapper call(Throwable throwable, Integer integer) {
return new Wrapper(throwable, integer);
}
}).flatMap(new Func1<Wrapper, Observable<?>>() {
@Override
public Observable<?> call(Wrapper wrapper) {
if ((wrapper.throwable instanceof ConnectException
|| wrapper.throwable instanceof SocketTimeoutException
|| wrapper.throwable instanceof TimeoutException)
&& wrapper.index < count + 1) { //如果超出重试次数也抛出错误,否则默认是会进入onCompleted
return Observable.timer(delay + (wrapper.index - 1) * increaseDelay, TimeUnit.MILLISECONDS);
}
return Observable.error(wrapper.throwable);
}
});
}
RetryWhenNetworkException.java 文件源码
项目:RxjavaRetrofitDemo-master
阅读 33
收藏 0
点赞 0
评论 0
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable
.zipWith(Observable.range(1, count + 1), new Func2<Throwable, Integer, Wrapper>() {
@Override
public Wrapper call(Throwable throwable, Integer integer) {
return new Wrapper(throwable, integer);
}
}).flatMap(new Func1<Wrapper, Observable<?>>() {
@Override
public Observable<?> call(Wrapper wrapper) {
if ((wrapper.throwable instanceof ConnectException
|| wrapper.throwable instanceof SocketTimeoutException
|| wrapper.throwable instanceof TimeoutException)
&& wrapper.index < count + 1) { //如果超出重试次数也抛出错误,否则默认是会进入onCompleted
return Observable.timer(delay + (wrapper.index - 1) * increaseDelay, TimeUnit.MILLISECONDS);
}
return Observable.error(wrapper.throwable);
}
});
}
RetryWhenNetworkException.java 文件源码
项目:collapselrecycler
阅读 25
收藏 0
点赞 0
评论 0
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable
.zipWith(Observable.range(1, count + 1), new Func2<Throwable, Integer, Wrapper>() {
@Override
public Wrapper call(Throwable throwable, Integer integer) {
return new Wrapper(throwable, integer);
}
}).flatMap(new Func1<Wrapper, Observable<?>>() {
@Override
public Observable<?> call(Wrapper wrapper) {
if ((wrapper.throwable instanceof ConnectException
|| wrapper.throwable instanceof SocketTimeoutException
|| wrapper.throwable instanceof TimeoutException)
&& wrapper.index < count + 1) { //如果超出重试次数也抛出错误,否则默认是会进入onCompleted
return Observable.timer(delay + (wrapper.index - 1) * increaseDelay, TimeUnit.MILLISECONDS);
}
return Observable.error(wrapper.throwable);
}
});
}
RetryWhenNetworkException.java 文件源码
项目:Rx-Retrofit
阅读 26
收藏 0
点赞 0
评论 0
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable
.zipWith(Observable.range(1, count + 1), new Func2<Throwable, Integer, Wrapper>() {
@Override
public Wrapper call(Throwable throwable, Integer integer) {
return new Wrapper(throwable, integer);
}
}).flatMap(new Func1<Wrapper, Observable<?>>() {
@Override
public Observable<?> call(Wrapper wrapper) {
if ((wrapper.throwable instanceof ConnectException
|| wrapper.throwable instanceof SocketTimeoutException
|| wrapper.throwable instanceof TimeoutException)
&& wrapper.index < count + 1) { //如果超出重试次数也抛出错误,否则默认是会进入onCompleted
return Observable.timer(delay + (wrapper.index - 1) * increaseDelay, TimeUnit.MILLISECONDS);
}
return Observable.error(wrapper.throwable);
}
});
}
RetryWhenNetworkException.java 文件源码
项目:RxRetrofit-mvp
阅读 28
收藏 0
点赞 0
评论 0
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable
.zipWith(Observable.range(1, count + 1), new Func2<Throwable, Integer, Wrapper>() {
@Override
public Wrapper call(Throwable throwable, Integer integer) {
return new Wrapper(throwable, integer);
}
}).flatMap(new Func1<Wrapper, Observable<?>>() {
@Override
public Observable<?> call(Wrapper wrapper) {
if ((wrapper.throwable instanceof ConnectException
|| wrapper.throwable instanceof SocketTimeoutException
|| wrapper.throwable instanceof TimeoutException)
&& wrapper.index < count + 1) { //如果超出重试次数也抛出错误,否则默认是会进入onCompleted
return Observable.timer(delay + (wrapper.index - 1) * increaseDelay, TimeUnit.MILLISECONDS);
}
return Observable.error(wrapper.throwable);
}
});
}
NewestTopicsFragment.java 文件源码
项目:V2EX-Android
阅读 22
收藏 0
点赞 0
评论 0
@Override
public void loadData() {
Subscription subscription = Observable.zip(V2exService.getInstance().getV2exApi().getTopicHot(), V2exService.getInstance().getV2exApi().getTopicLatest(), new Func2<List<Topics>, List<Topics>, TopicsData>() {
@Override
public TopicsData call(List<Topics> hotTopics, List<Topics> latestTopics) {
setImagData(hotTopics);
setImagData(latestTopics);
TopicsData topicsData = new TopicsData();
topicsData.hotTopics = hotTopics;
latestTopics.addAll(0 , hotTopics);
topicsData.allTopics = latestTopics;
return topicsData;
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
addSubscription(subscription);
}
MainListWithExample_Observable_flatMapIterable.java 文件源码
项目:RxJavaDemo
阅读 22
收藏 0
点赞 0
评论 0
private Observable example2() {
// flatMapIterable(Func1, Func2)
return Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.flatMapIterable(
new Func1<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> call(Integer integer) {
ArrayList<Integer> s = new ArrayList<>();
for (int i = 0; i < integer; i++) {
s.add(i);
}
return s;
}
}, new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer o, Integer o2) {
return o + o2;
}
}
);
}
NodeMerger.java 文件源码
项目:microservices-dashboard-server
阅读 17
收藏 0
点赞 0
评论 0
public static Func2<List<Node>, Node, List<Node>> merge() {
return (mergedNodes, node) -> {
// TODO: we should be able to enrich nodes in a general way before merging, eg. convert all service names to lowercase
// Aggregator specific logic should not come here, eg. removing the Eureka description
Optional<Integer> nodeIndex = mergedNodes.stream()
.filter(n -> n.getId().equalsIgnoreCase(node.getId()))
.map(mergedNodes::indexOf)
.findFirst();
if (nodeIndex.isPresent()) {
logger.info("Node with id '{}' previously added, merging", node.getId());
mergedNodes.get(nodeIndex.get()).mergeWith(node);
} else {
logger.info("Node with id '{}' was not merged before, adding it to the list", node.getId());
mergedNodes.add(node);
}
return mergedNodes;
};
}
AirportsFragment.java 文件源码
项目:AirportCodes-Android
阅读 18
收藏 0
点赞 0
评论 0
@Override
protected Observable<List<Airport>> createObservable()
{
copyLocalFileIfNone();
return AirportPersister.INSTANCE.getAirports()
.lift(new FlattenOperator<Airport>())
.toSortedList(new Func2<Airport, Airport, Integer>()
{
@Override
public Integer call(Airport airport1, Airport airport2)
{
return airport1.code.compareTo(airport2.code);
}
});
}
ZipFragment.java 文件源码
项目:FMTech
阅读 20
收藏 0
点赞 0
评论 0
@OnClick(R.id.zipLoadBt)
void loadData(){
mSwipeRefreshLayout.setRefreshing(true);
unsubscribe();
mSubscription = Observable.zip(NetWorkService.getGankApi().getBeauties(200, 1).map(GankBeautyResultToItemsMapper.getInstance()),
NetWorkService.getZbApi().search("装逼"), new Func2<List<Image>, List<Image>, List<Image>>() {
@Override
public List<Image> call(List<Image> images1, List<Image> images2) {
List<Image> images = new ArrayList<Image>();
for(int i = 0; i < images1.size()/2 && i < images2.size(); i++){
images.add(images1.get(i * 2));
images.add(images1.get(i * 2 + 1));
images.add(images2.get(i));
}
return images;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(mObserver);
}
BigSort.java 文件源码
项目:bigsort
阅读 18
收藏 0
点赞 0
评论 0
public static <T, Resource> Observable<T> sort(Observable<T> source,
final Comparator<T> comparator,
final Func2<Observable<T>, Resource, Observable<Resource>> writer,
final Func1<Resource, Observable<T>> reader, final Func0<Resource> resourceFactory,
final Action1<Resource> resourceDisposer, int maxToSortInMemoryPerThread,
final int maxTempResources, Scheduler scheduler) {
Preconditions.checkArgument(maxToSortInMemoryPerThread > 0,
"maxToSortInMemoryPerThread must be greater than 0");
Preconditions.checkArgument(maxTempResources >= 2, "maxTempResources must be at least 2");
return source
// buffer into groups small enough to sort in memory
.buffer(maxToSortInMemoryPerThread)
// sort each buffer to a resource
.flatMap(sortInMemoryAndWriteToAResource(comparator, writer, resourceFactory,
scheduler))
// reduce by merging groups of resources to a single resource
// once the resource count is maxTempResources
.lift(new OperatorResourceMerger<Resource, T>(comparator, writer, reader,
resourceFactory, resourceDisposer, maxTempResources))
// help out backpressure because ResourceMerger doesn't support
// yet
.onBackpressureBuffer()
// emit the contents of the last file in the reduction process
.flatMap(reader);
}
GoogleSearchController.java 文件源码
项目:FloatingSearchView
阅读 20
收藏 0
点赞 0
评论 0
private Observable<SearchResult[]> getQueryObservable(String query) {
return mSearch.search(query)
.flatMap(new Func1<Response, Observable<SearchResult[]>>() {
@Override
public Observable<SearchResult[]> call(Response response) {
if (response.responseData == null)
return Observable.error(new SearchException(response.responseDetails));
return Observable.just(response.responseData.results);
}
})
.retry(new Func2<Integer, Throwable, Boolean>() {
@Override
public Boolean call(Integer integer, Throwable throwable) {
return throwable instanceof InterruptedIOException;
}
});
}
OperatorCompareTest.java 文件源码
项目:rx-extended
阅读 20
收藏 0
点赞 0
评论 0
@Test
public void testCompareOperatorInitialValue() throws Exception {
@SuppressWarnings("unchecked") rx.Observer<Integer> observer = mock(rx.Observer.class);
Observable<Integer> observable = Observable.just(1, 2, 3);
OperatorCompare<Integer, Integer> operatorCompare = new OperatorCompare<Integer, Integer>(10, new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer + integer2;
}
});
observable.lift(operatorCompare).subscribe(observer);
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onNext(11);
verify(observer, times(1)).onNext(3);
verify(observer, times(1)).onNext(5);
verify(observer, times(1)).onCompleted();
}
OperatorCompareTest.java 文件源码
项目:rx-extended
阅读 17
收藏 0
点赞 0
评论 0
@Test
public void testCompareOperatorNoInitialValue() throws Exception {
@SuppressWarnings("unchecked") rx.Observer<Integer> observer = mock(rx.Observer.class);
Observable<Integer> observable = Observable.just(1, 2, 3);
OperatorCompare<Integer, Integer> operatorCompare = new OperatorCompare<Integer, Integer>(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer + integer2;
}
});
observable.lift(operatorCompare).subscribe(observer);
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onNext(1);
verify(observer, times(1)).onNext(3);
verify(observer, times(1)).onNext(5);
verify(observer, times(1)).onCompleted();
}