java类rx.functions.Func2的实例源码

MockLocationsActivity.java 文件源码 项目:GitHub 阅读 59 收藏 0 点赞 0 评论 0
private void setMockMode(boolean toggle) {
    if (toggle) {
        mockLocationSubscription =
                Observable.zip(locationProvider.mockLocation(mockLocationObservable),
                        mockLocationObservable, new Func2<Status, Location, String>() {
                            int count = 0;

                            @Override
                            public String call(Status result, Location location) {
                                return new LocationToStringFunc().call(location) + " " + count++;
                            }
                        })
                        .subscribe(new DisplayTextOnViewAction(mockLocationView), new ErrorHandler());
    } else {
        mockLocationSubscription.unsubscribe();
    }
}
RetryWhenNetworkException.java 文件源码 项目:Bailan 阅读 30 收藏 0 点赞 0 评论 0
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
    return observable
            .zipWith(Observable.range(1, count + 1), new Func2<Throwable, Integer, Wrapper>() {
                @Override
                public Wrapper call(Throwable throwable, Integer integer) {
                    //压缩规则 合并后的结果是一个Observable<Wrapper>
                    return new Wrapper(throwable, integer);
                }
            }).flatMap(new Func1<Wrapper, Observable<?>>() {
                @Override
                public Observable<?> call(Wrapper wrapper) {
                    //转换规则
                    if ((wrapper.throwable instanceof ConnectException
                            || wrapper.throwable instanceof SocketTimeoutException
                            || wrapper.throwable instanceof TimeoutException)
                            && wrapper.index < count + 1) { //如果超出重试次数也抛出错误,否则默认是会进入onCompleted
                        return Observable.timer(delay + (wrapper.index - 1) * increaseDelay, TimeUnit.MILLISECONDS);

                    }
                    return Observable.error(wrapper.throwable);
                }
            });
}
RetryWhenNetworkException.java 文件源码 项目:RxRetrofit-tokean 阅读 27 收藏 0 点赞 0 评论 0
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
    return observable
            .zipWith(Observable.range(1, count + 1), new Func2<Throwable, Integer, Wrapper>() {
                @Override
                public Wrapper call(Throwable throwable, Integer integer) {
                    return new Wrapper(throwable, integer);
                }
            }).flatMap(new Func1<Wrapper, Observable<?>>() {
                @Override
                public Observable<?> call(Wrapper wrapper) {
                    if ((wrapper.throwable instanceof ConnectException
                            || wrapper.throwable instanceof SocketTimeoutException
                            || wrapper.throwable instanceof TimeoutException)
                            && wrapper.index < count + 1) { //如果超出重试次数也抛出错误,否则默认是会进入onCompleted
                        return Observable.timer(delay + (wrapper.index - 1) * increaseDelay, TimeUnit.MILLISECONDS);

                    }
                    return Observable.error(wrapper.throwable);
                }
            });
}
RetryWhenNetWorkException.java 文件源码 项目:TestChat 阅读 24 收藏 0 点赞 0 评论 0
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {

        return observable.zipWith(Observable.range(1, retryCount + 1), new Func2<Throwable, Integer, ExceptionWrapper>() {
                @Override
                public ExceptionWrapper call(Throwable throwable, Integer integer) {
                        return new ExceptionWrapper(integer, throwable);
                }
        }).flatMap(new Func1<ExceptionWrapper, Observable<?>>() {
                @Override
                public Observable<?> call(ExceptionWrapper exceptionWrapper) {
                        if ((exceptionWrapper.throwable instanceof ConnectException ||
                                exceptionWrapper.throwable instanceof SocketException ||
                                exceptionWrapper.throwable instanceof TimeoutException) && exceptionWrapper.index < retryCount + 1) {
                                return Observable.timer(delayTime + (exceptionWrapper.index - 1) * delayTime, java.util.concurrent.TimeUnit.MILLISECONDS);
                        }
                        return Observable.error(exceptionWrapper.throwable);
                }
        });
}
WebAppBaseImpl.java 文件源码 项目:azure-libraries-for-java 阅读 25 收藏 0 点赞 0 评论 0
@Override
public Observable<Map<String, AppSetting>> getAppSettingsAsync() {
    return Observable.zip(listAppSettings(), listSlotConfigurations(), new Func2<StringDictionaryInner, SlotConfigNamesResourceInner, Map<String, AppSetting>>() {
        @Override
        public Map<String, AppSetting> call(final StringDictionaryInner appSettingsInner, final SlotConfigNamesResourceInner slotConfigs) {
            if (appSettingsInner == null || appSettingsInner.properties() == null) {
                return null;
            }
            return Maps.asMap(appSettingsInner.properties().keySet(), new Function<String, AppSetting>() {
                @Override
                public AppSetting apply(String input) {
                    return new AppSettingImpl(input, appSettingsInner.properties().get(input),
                            slotConfigs != null && slotConfigs.appSettingNames() != null && slotConfigs.appSettingNames().contains(input));
                }
            });
        }
    });
}
WebAppBaseImpl.java 文件源码 项目:azure-libraries-for-java 阅读 23 收藏 0 点赞 0 评论 0
@Override
public Observable<Map<String, ConnectionString>> getConnectionStringsAsync() {
    return Observable.zip(listConnectionStrings(), listSlotConfigurations(), new Func2<ConnectionStringDictionaryInner, SlotConfigNamesResourceInner, Map<String, ConnectionString>>() {
        @Override
        public Map<String, ConnectionString> call(final ConnectionStringDictionaryInner connectionStringsInner, final SlotConfigNamesResourceInner slotConfigs) {
            if (connectionStringsInner == null || connectionStringsInner.properties() == null) {
                return null;
            }
            return Maps.asMap(connectionStringsInner.properties().keySet(), new Function<String, ConnectionString>() {
                @Override
                public ConnectionString apply(String input) {
                    return new ConnectionStringImpl(input, connectionStringsInner.properties().get(input),
                            slotConfigs != null && slotConfigs.connectionStringNames() != null && slotConfigs.connectionStringNames().contains(input));
                }
            });
        }
    });
}
AppService.java 文件源码 项目:disclosure-android-app 阅读 27 收藏 0 点赞 0 评论 0
public Func2<AppReport, AppReport, Integer> getSortingFunction(SortBy sortBy) {
  switch (sortBy) {
    case NAME:
      return new SortByName();
    case LIBRARY_COUNT:
      return new SortByLibraryCount();
    case ANALYZED_AT:
      return new SortByAnalyzedAt();
    case PERMISSION_COUNT:
      return new SortByPermissionCount();
    default:
      throw new IllegalArgumentException("no sorting function for %s " + sortBy);
  }
}
RxIdlingResource.java 文件源码 项目:GongXianSheng 阅读 21 收藏 0 点赞 0 评论 0
private void setupHooks() {
    RxJavaHooks.setOnObservableStart(new Func2<Observable, OnSubscribe, OnSubscribe>() {
        @Override
        public OnSubscribe call(Observable observable, OnSubscribe onSubscribe) {
            incrementActiveSubscriptionsCount();
            return onSubscribe;
        }
    });

    RxJavaHooks.setOnObservableSubscribeError(new Func1<Throwable, Throwable>() {
        @Override
        public Throwable call(Throwable throwable) {
            decrementActiveSubscriptionsCount();
            return throwable;
        }
    });

    RxJavaHooks.setOnObservableReturn(new Func1<Subscription, Subscription>() {
        @Override
        public Subscription call(Subscription subscription) {
            decrementActiveSubscriptionsCount();
            return subscription;
        }
    });
}
OperatorSequenceEqual.java 文件源码 项目:boohee_v5.6 阅读 22 收藏 0 点赞 0 评论 0
public static <T> Observable<Boolean> sequenceEqual(Observable<? extends T> first, Observable<? extends T> second, final Func2<? super T, ? super T, Boolean> equality) {
    return Observable.zip(materializeLite(first), materializeLite(second), new Func2<Object, Object, Boolean>() {
        public Boolean call(Object t1, Object t2) {
            boolean c1;
            if (t1 == OperatorSequenceEqual.LOCAL_ONCOMPLETED) {
                c1 = true;
            } else {
                c1 = false;
            }
            boolean c2;
            if (t2 == OperatorSequenceEqual.LOCAL_ONCOMPLETED) {
                c2 = true;
            } else {
                c2 = false;
            }
            if (c1 && c2) {
                return Boolean.valueOf(true);
            }
            if (c1 || c2) {
                return Boolean.valueOf(false);
            }
            return (Boolean) equality.call(t1, t2);
        }
    }).all(UtilityFunctions.identity());
}
PostItemsListFragment.java 文件源码 项目:mimi-reader 阅读 22 收藏 0 点赞 0 评论 0
private Func2<ChanCatalog, List<HiddenThread>, ChanCatalog> hideThreads() {
    return new Func2<ChanCatalog, List<HiddenThread>, ChanCatalog>() {
        @Override
        public ChanCatalog call(ChanCatalog chanCatalog, List<HiddenThread> hiddenThreads) {
            if (chanCatalog != null) {
                List<ChanPost> posts = new ArrayList<>();
                for (ChanPost post : chanCatalog.getPosts()) {
                    boolean found = false;
                    for (HiddenThread hiddenThread : hiddenThreads) {
                        if (hiddenThread.threadId == post.getNo()) {
                            found = true;
                        }
                    }

                    if (!found) {
                        posts.add(post);
                    }
                }

                chanCatalog.setPosts(posts);

            }
            return chanCatalog;
        }
    };
}
RxJavaCallAdapterFactory.java 文件源码 项目:HttpService 阅读 23 收藏 0 点赞 0 评论 0
@Override public Observable<Long> call(Observable<? extends Throwable> errorObservable) {
  return errorObservable.zipWith(Observable.range(INITIAL, maxConnectCount),
      new Func2<Throwable, Integer, InnerThrowable>() {

        @Override public InnerThrowable call(Throwable throwable, Integer i) {
          if (throwable instanceof IOException) return new InnerThrowable(throwable, i);
          return new InnerThrowable(throwable, i);
        }
      }).concatMap(new Func1<InnerThrowable, Observable<Long>>() {
    @Override public Observable<Long> call(InnerThrowable innerThrowable) {

      Integer currentCount = innerThrowable.getCurrentRetryCount();
      if (RetryWhenFunc.this.maxConnectCount.equals(currentCount)) {
        return Observable.error(innerThrowable.getThrowable());
      }

      /*use Schedulers#immediate() to keep on same thread */
      return Observable.timer((long) Math.pow(2, currentCount), TimeUnit.SECONDS,
          Schedulers.immediate());
    }
  });
}
CommentPresenter.java 文件源码 项目:zhihudailysoap 阅读 20 收藏 0 点赞 0 评论 0
@Override
public void fetchDataByNetWork(final int newsID) {
    Observable<Comments> short_comments_service = RetrofitSingleton.getApiService((Activity) mCommentView).shortComment(newsID);
    Observable<Comments> long_comments_service = RetrofitSingleton.getApiService((Activity) mCommentView).longComment(newsID);
    Observable.zip(short_comments_service, long_comments_service, new Func2<Comments, Comments, Comments1>(){

        @Override
        public Comments1 call(Comments comments, Comments comments2) {
            Comments1 comments1 = new Comments1();
            comments1.long_comments = comments.comments;
            comments1.short_comments = comments2.comments;
            return comments1;
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer);
}
GankPresenter.java 文件源码 项目:yApp 阅读 19 收藏 0 点赞 0 评论 0
@Override
public Observable<List<Gank>> getObservable() {
    if (resId == -1) return null;

    return getDataSupports().getGankData(type, mCurrentPage, PAGE_SIZE)
            .map(new Func1<GankData, List<Gank>>() {
                @Override
                public List<Gank> call(GankData gankData) {
                    return gankData.getResults();
                }
            })
            .flatMap(new Func1<List<Gank>, Observable<Gank>>() {
                @Override
                public Observable<Gank> call(List<Gank> ganks) {
                    return Observable.from(ganks);
                }
            })
            .toSortedList(new Func2<Gank, Gank, Integer>() {
                @Override
                public Integer call(Gank gank, Gank gank2) {
                    return gank2.getPublishedAt().compareTo(gank.getPublishedAt());
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}
DataManager.java 文件源码 项目:Mondroid 阅读 24 收藏 0 点赞 0 评论 0
public Single<AccountData> getAccountDetails() {
    return getAccounts()
            .flatMap(new Func1<AccountsResponse, Single<? extends AccountData>>() {
                @Override
                public Single<? extends AccountData> call(AccountsResponse accountsResponse) {
                    if (!accountsResponse.accounts.isEmpty()) {
                        String accountId = accountsResponse.accounts.get(0).id;
                        return Single.zip(getBalance(accountId), getTransactions(accountId),
                                new Func2<Balance, List<Transaction>, AccountData>() {
                            @Override
                            public AccountData call(Balance balance,
                                                    List<Transaction> transactions) {
                                return new AccountData(balance, transactions);
                            }
                        });
                    }
                    return Single.just(new AccountData());
                }
            });
}
ApiPaginator.java 文件源码 项目:android-oss 阅读 21 收藏 0 点赞 0 评论 0
private ApiPaginator(
  final @NonNull Observable<Void> nextPage,
  final @NonNull Observable<Params> startOverWith,
  final @NonNull Func1<Envelope, List<Data>> envelopeToListOfData,
  final @NonNull Func1<Params, Observable<Envelope>> loadWithParams,
  final @NonNull Func1<String, Observable<Envelope>> loadWithPaginationPath,
  final @NonNull Func1<Envelope, String> envelopeToMoreUrl,
  final @NonNull Func1<List<Data>, List<Data>> pageTransformation,
  final boolean clearWhenStartingOver,
  final @NonNull Func2<List<Data>, List<Data>, List<Data>> concater,
  final boolean distinctUntilChanged
) {
  this.nextPage = nextPage;
  this.startOverWith = startOverWith;
  this.envelopeToListOfData = envelopeToListOfData;
  this.loadWithParams = loadWithParams;
  this.envelopeToMoreUrl = envelopeToMoreUrl;
  this.pageTransformation = pageTransformation;
  this.loadWithPaginationPath = loadWithPaginationPath;
  this.clearWhenStartingOver = clearWhenStartingOver;
  this.concater = concater;
  this.distinctUntilChanged = distinctUntilChanged;

  this.paginatedData = this.startOverWith.switchMap(this::dataWithPagination);
  this.loadingPage = this.startOverWith.switchMap(__ -> nextPage.scan(1, (accum, ___) -> accum + 1));
}
GoogleSearchController.java 文件源码 项目:FloatingSearchView 阅读 21 收藏 0 点赞 0 评论 0
private Observable<SearchResult[]> getQueryObservable(String query) {
    return mSearch.search(query)
            .flatMap(new Func1<Response, Observable<SearchResult[]>>() {
                @Override
                public Observable<SearchResult[]> call(Response response) {
                    if (response.responseData == null)
                        return Observable.error(new SearchException(response.responseDetails));
                    return Observable.just(response.responseData.results);
                }
            })
            .retry(new Func2<Integer, Throwable, Boolean>() {
                @Override
                public Boolean call(Integer integer, Throwable throwable) {
                    return throwable instanceof InterruptedIOException;
                }
            });
}
RetryWhenNetworkException.java 文件源码 项目:RxjavaRetrofitDemo-string-master 阅读 23 收藏 0 点赞 0 评论 0
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
    return observable
            .zipWith(Observable.range(1, count + 1), new Func2<Throwable, Integer, Wrapper>() {
                @Override
                public Wrapper call(Throwable throwable, Integer integer) {
                    return new Wrapper(throwable, integer);
                }
            }).flatMap(new Func1<Wrapper, Observable<?>>() {
                @Override
                public Observable<?> call(Wrapper wrapper) {
                    if ((wrapper.throwable instanceof ConnectException
                            || wrapper.throwable instanceof SocketTimeoutException
                            || wrapper.throwable instanceof TimeoutException)
                            && wrapper.index < count + 1) { //如果超出重试次数也抛出错误,否则默认是会进入onCompleted
                        return Observable.timer(delay + (wrapper.index - 1) * increaseDelay, TimeUnit.MILLISECONDS);

                    }
                    return Observable.error(wrapper.throwable);
                }
            });
}
RetryWhenNetworkException.java 文件源码 项目:RxjavaRetrofitDemo-master 阅读 33 收藏 0 点赞 0 评论 0
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
    return observable
            .zipWith(Observable.range(1, count + 1), new Func2<Throwable, Integer, Wrapper>() {
                @Override
                public Wrapper call(Throwable throwable, Integer integer) {
                    return new Wrapper(throwable, integer);
                }
            }).flatMap(new Func1<Wrapper, Observable<?>>() {
                @Override
                public Observable<?> call(Wrapper wrapper) {
                    if ((wrapper.throwable instanceof ConnectException
                            || wrapper.throwable instanceof SocketTimeoutException
                            || wrapper.throwable instanceof TimeoutException)
                            && wrapper.index < count + 1) { //如果超出重试次数也抛出错误,否则默认是会进入onCompleted
                        return Observable.timer(delay + (wrapper.index - 1) * increaseDelay, TimeUnit.MILLISECONDS);

                    }
                    return Observable.error(wrapper.throwable);
                }
            });
}
RetryWhenNetworkException.java 文件源码 项目:collapselrecycler 阅读 25 收藏 0 点赞 0 评论 0
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
    return observable
            .zipWith(Observable.range(1, count + 1), new Func2<Throwable, Integer, Wrapper>() {
                @Override
                public Wrapper call(Throwable throwable, Integer integer) {
                    return new Wrapper(throwable, integer);
                }
            }).flatMap(new Func1<Wrapper, Observable<?>>() {
                @Override
                public Observable<?> call(Wrapper wrapper) {
                    if ((wrapper.throwable instanceof ConnectException
                            || wrapper.throwable instanceof SocketTimeoutException
                            || wrapper.throwable instanceof TimeoutException)
                            && wrapper.index < count + 1) { //如果超出重试次数也抛出错误,否则默认是会进入onCompleted
                        return Observable.timer(delay + (wrapper.index - 1) * increaseDelay, TimeUnit.MILLISECONDS);

                    }
                    return Observable.error(wrapper.throwable);
                }
            });
}
RetryWhenNetworkException.java 文件源码 项目:Rx-Retrofit 阅读 26 收藏 0 点赞 0 评论 0
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
    return observable
            .zipWith(Observable.range(1, count + 1), new Func2<Throwable, Integer, Wrapper>() {
                @Override
                public Wrapper call(Throwable throwable, Integer integer) {
                    return new Wrapper(throwable, integer);
                }
            }).flatMap(new Func1<Wrapper, Observable<?>>() {
                @Override
                public Observable<?> call(Wrapper wrapper) {
                    if ((wrapper.throwable instanceof ConnectException
                            || wrapper.throwable instanceof SocketTimeoutException
                            || wrapper.throwable instanceof TimeoutException)
                            && wrapper.index < count + 1) { //如果超出重试次数也抛出错误,否则默认是会进入onCompleted
                        return Observable.timer(delay + (wrapper.index - 1) * increaseDelay, TimeUnit.MILLISECONDS);

                    }
                    return Observable.error(wrapper.throwable);
                }
            });
}
RetryWhenNetworkException.java 文件源码 项目:RxRetrofit-mvp 阅读 28 收藏 0 点赞 0 评论 0
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
    return observable
            .zipWith(Observable.range(1, count + 1), new Func2<Throwable, Integer, Wrapper>() {
                @Override
                public Wrapper call(Throwable throwable, Integer integer) {
                    return new Wrapper(throwable, integer);
                }
            }).flatMap(new Func1<Wrapper, Observable<?>>() {
                @Override
                public Observable<?> call(Wrapper wrapper) {
                    if ((wrapper.throwable instanceof ConnectException
                            || wrapper.throwable instanceof SocketTimeoutException
                            || wrapper.throwable instanceof TimeoutException)
                            && wrapper.index < count + 1) { //如果超出重试次数也抛出错误,否则默认是会进入onCompleted
                        return Observable.timer(delay + (wrapper.index - 1) * increaseDelay, TimeUnit.MILLISECONDS);

                    }
                    return Observable.error(wrapper.throwable);
                }
            });
}
NewestTopicsFragment.java 文件源码 项目:V2EX-Android 阅读 22 收藏 0 点赞 0 评论 0
@Override
public void loadData() {
    Subscription subscription = Observable.zip(V2exService.getInstance().getV2exApi().getTopicHot(), V2exService.getInstance().getV2exApi().getTopicLatest(), new Func2<List<Topics>, List<Topics>, TopicsData>() {
        @Override
        public TopicsData call(List<Topics> hotTopics, List<Topics> latestTopics) {
            setImagData(hotTopics);
            setImagData(latestTopics);
            TopicsData topicsData = new TopicsData();
            topicsData.hotTopics = hotTopics;
            latestTopics.addAll(0 , hotTopics);
            topicsData.allTopics = latestTopics;
            return topicsData;
        }
    }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(observer);
    addSubscription(subscription);
}
MainListWithExample_Observable_flatMapIterable.java 文件源码 项目:RxJavaDemo 阅读 22 收藏 0 点赞 0 评论 0
private Observable example2() {
    // flatMapIterable(Func1, Func2)
    return Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
            .flatMapIterable(
                    new Func1<Integer, Iterable<Integer>>() {
                        @Override
                        public Iterable<Integer> call(Integer integer) {
                            ArrayList<Integer> s = new ArrayList<>();
                            for (int i = 0; i < integer; i++) {
                                s.add(i);
                            }
                            return s;
                        }
                    }, new Func2<Integer, Integer, Integer>() {

                        @Override
                        public Integer call(Integer o, Integer o2) {
                            return o + o2;
                        }
                    }
            );
}
NodeMerger.java 文件源码 项目:microservices-dashboard-server 阅读 17 收藏 0 点赞 0 评论 0
public static Func2<List<Node>, Node, List<Node>> merge() {
    return (mergedNodes, node) -> {

        // TODO: we should be able to enrich nodes in a general way before merging, eg. convert all service names to lowercase
        // Aggregator specific logic should not come here, eg. removing the Eureka description

        Optional<Integer> nodeIndex = mergedNodes.stream()
                .filter(n -> n.getId().equalsIgnoreCase(node.getId()))
                .map(mergedNodes::indexOf)
                .findFirst();

        if (nodeIndex.isPresent()) {
            logger.info("Node with id '{}' previously added, merging", node.getId());
            mergedNodes.get(nodeIndex.get()).mergeWith(node);
        } else {
            logger.info("Node with id '{}' was not merged before, adding it to the list", node.getId());
            mergedNodes.add(node);
        }

        return mergedNodes;
    };
}
AirportsFragment.java 文件源码 项目:AirportCodes-Android 阅读 18 收藏 0 点赞 0 评论 0
@Override
protected Observable<List<Airport>> createObservable()
{
    copyLocalFileIfNone();

    return AirportPersister.INSTANCE.getAirports()
            .lift(new FlattenOperator<Airport>())
            .toSortedList(new Func2<Airport, Airport, Integer>()
            {
                @Override
                public Integer call(Airport airport1, Airport airport2)
                {
                    return airport1.code.compareTo(airport2.code);
                }
            });
}
ZipFragment.java 文件源码 项目:FMTech 阅读 20 收藏 0 点赞 0 评论 0
@OnClick(R.id.zipLoadBt)
void loadData(){
    mSwipeRefreshLayout.setRefreshing(true);
    unsubscribe();
    mSubscription = Observable.zip(NetWorkService.getGankApi().getBeauties(200, 1).map(GankBeautyResultToItemsMapper.getInstance()),
            NetWorkService.getZbApi().search("装逼"), new Func2<List<Image>, List<Image>, List<Image>>() {
                @Override
                public List<Image> call(List<Image> images1, List<Image> images2) {
                    List<Image> images = new ArrayList<Image>();
                    for(int i = 0; i < images1.size()/2 && i < images2.size(); i++){
                        images.add(images1.get(i * 2));
                        images.add(images1.get(i * 2 + 1));
                        images.add(images2.get(i));
                    }
                    return images;
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(mObserver);
}
BigSort.java 文件源码 项目:bigsort 阅读 18 收藏 0 点赞 0 评论 0
public static <T, Resource> Observable<T> sort(Observable<T> source,
        final Comparator<T> comparator,
        final Func2<Observable<T>, Resource, Observable<Resource>> writer,
        final Func1<Resource, Observable<T>> reader, final Func0<Resource> resourceFactory,
        final Action1<Resource> resourceDisposer, int maxToSortInMemoryPerThread,
        final int maxTempResources, Scheduler scheduler) {
    Preconditions.checkArgument(maxToSortInMemoryPerThread > 0,
            "maxToSortInMemoryPerThread must be greater than 0");
    Preconditions.checkArgument(maxTempResources >= 2, "maxTempResources must be at least 2");
    return source
            // buffer into groups small enough to sort in memory
            .buffer(maxToSortInMemoryPerThread)
            // sort each buffer to a resource
            .flatMap(sortInMemoryAndWriteToAResource(comparator, writer, resourceFactory,
                    scheduler))
            // reduce by merging groups of resources to a single resource
            // once the resource count is maxTempResources
            .lift(new OperatorResourceMerger<Resource, T>(comparator, writer, reader,
                    resourceFactory, resourceDisposer, maxTempResources))
            // help out backpressure because ResourceMerger doesn't support
            // yet
            .onBackpressureBuffer()
            // emit the contents of the last file in the reduction process
            .flatMap(reader);

}
GoogleSearchController.java 文件源码 项目:FloatingSearchView 阅读 20 收藏 0 点赞 0 评论 0
private Observable<SearchResult[]> getQueryObservable(String query) {
    return mSearch.search(query)
            .flatMap(new Func1<Response, Observable<SearchResult[]>>() {
                @Override
                public Observable<SearchResult[]> call(Response response) {
                    if (response.responseData == null)
                        return Observable.error(new SearchException(response.responseDetails));
                    return Observable.just(response.responseData.results);
                }
            })
            .retry(new Func2<Integer, Throwable, Boolean>() {
                @Override
                public Boolean call(Integer integer, Throwable throwable) {
                    return throwable instanceof InterruptedIOException;
                }
            });
}
OperatorCompareTest.java 文件源码 项目:rx-extended 阅读 20 收藏 0 点赞 0 评论 0
@Test
public void testCompareOperatorInitialValue() throws Exception {
    @SuppressWarnings("unchecked") rx.Observer<Integer> observer = mock(rx.Observer.class);

    Observable<Integer> observable = Observable.just(1, 2, 3);

    OperatorCompare<Integer, Integer> operatorCompare = new OperatorCompare<Integer, Integer>(10, new Func2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer integer, Integer integer2) {
            return integer + integer2;
        }
    });

    observable.lift(operatorCompare).subscribe(observer);

    verify(observer, never()).onError(any(Throwable.class));
    verify(observer, times(1)).onNext(11);
    verify(observer, times(1)).onNext(3);
    verify(observer, times(1)).onNext(5);
    verify(observer, times(1)).onCompleted();
}
OperatorCompareTest.java 文件源码 项目:rx-extended 阅读 17 收藏 0 点赞 0 评论 0
@Test
public void testCompareOperatorNoInitialValue() throws Exception {
    @SuppressWarnings("unchecked") rx.Observer<Integer> observer = mock(rx.Observer.class);

    Observable<Integer> observable = Observable.just(1, 2, 3);

    OperatorCompare<Integer, Integer> operatorCompare = new OperatorCompare<Integer, Integer>(new Func2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer integer, Integer integer2) {
            return integer + integer2;
        }
    });

    observable.lift(operatorCompare).subscribe(observer);

    verify(observer, never()).onError(any(Throwable.class));
    verify(observer, times(1)).onNext(1);
    verify(observer, times(1)).onNext(3);
    verify(observer, times(1)).onNext(5);
    verify(observer, times(1)).onCompleted();
}


问题


面经


文章

微信
公众号

扫码关注公众号