@Override
public Flowable<Integer> deleteAll() {
if(shouldThrowError){
shouldThrowError = false; // special case because the StoreService needs to call again getAll()
return Flowable.error(new Exception("deleteAll.error"));
}
return getAll(null, null)
.delay(1, TimeUnit.SECONDS)
.flatMap(new Function<Optional<List<TestModel>>, Flowable<Integer>>() {
@Override
public Flowable<Integer> apply(Optional<List<TestModel>> ts) throws Exception {
return Flowable.just(ts.get().size());
}
});
}
java类io.reactivex.functions.Function的实例源码
TestStore.java 文件源码
项目:store2store
阅读 38
收藏 0
点赞 0
评论 0
AllAppInfoFragment.java 文件源码
项目:android-study
阅读 96
收藏 0
点赞 0
评论 0
@Override public void initView(View view) {
requestBaseInit(getPageTitle());
mAdapter = new AllAppInfoAdapter(this, mAppInfos);
mList.setLayoutManager(new LinearLayoutManager(getActivity()));
mList.setAdapter(mAdapter);
Observable.just("2")
.subscribeOn(Schedulers.io())
.map(new Function<String, List<AppUtils.AppInfo>>() {
@Override public List<AppUtils.AppInfo> apply(@NonNull String s) throws Exception {
return AppUtils.getAppsInfo();
}
})
.observeOn(PausedHandlerScheduler.from(getHandler()))
.compose(mLifecycleProvider.<List<AppUtils.AppInfo>>bindUntilEvent(FragmentEvent.DESTROY))
.subscribe(new Consumer<List<AppUtils.AppInfo>>() {
@Override public void accept(@NonNull List<AppUtils.AppInfo> appInfos) throws Exception {
mAdapter.resetData(appInfos);
}
});
}
SimpleFlowableList.java 文件源码
项目:rxtools
阅读 27
收藏 0
点赞 0
评论 0
void applyOperation(final Function<List<T>, Update<T>> operation)
{
synchronized (_batchingLock) {
if (_batchedOperations != null) {
_batchedOperations.add(operation);
return;
}
}
applyUpdate(new Function<List<T>, Update<T>>() {
@Override
public Update<T> apply(List<T> list) throws Exception
{
list = new ArrayList<>(list);
return operation.apply(list);
}
});
}
ClientsNetworkCalls.java 文件源码
项目:GSB-2017-Android
阅读 32
收藏 0
点赞 0
评论 0
public static Observable<List<Client>> getAllClients() {
ClientsService service = ServiceGenerator.createService(ClientsService.class);
return service.getAllClients(UrlManager.getAllClientsURL())
.flatMap(new Function<JsonElement, Observable<List<Client>>>() {
@Override
public Observable<List<Client>> apply(JsonElement jsonElement) throws Exception {
if(jsonElement != null) {
Log.i("Get All Clients" , "JSON: "+jsonElement.toString());
if(jsonElement.isJsonArray()) {
List<Client> clients = Client.ClientsListParser.fromJsonArray(jsonElement.getAsJsonArray());
return Observable.just(clients);
} else {
return Observable.error(new Exception("Expected a JSON Array"));
}
} else {
return Observable.just((List<Client>) new ArrayList<Client>());
}
}
}).observeOn(AndroidSchedulers.mainThread());
}
StorageImpl.java 文件源码
项目:vt-support
阅读 32
收藏 0
点赞 0
评论 0
@Override
public Observable<StorageResult> put(Observable<Entry> entries) {
return entries.flatMap((Function<Entry, ObservableSource<StorageResult>>) entry -> {
final String insert =
"INSERT OR REPLACE INTO TILES(zoom_level, tile_column, tile_row, tile_data)"
+ " values (?, ?, ?, ?);";
byte[] compressedMvt;
try {
compressedMvt = CompressUtil.getCompressedAsGzip(entry.getVector());
} catch (final IOException ex) {
throw Exceptions.propagate(ex);
}
Observable<Object> params = Observable.<Object>just(entry.getZoomLevel(), entry.getColumn(),
flipY(entry.getRow(), entry.getZoomLevel()), compressedMvt);
return dataSource.update(insert)
.parameterStream(params.toFlowable(BackpressureStrategy.BUFFER)).counts()
.map(integer -> new StorageResult(entry))
.onErrorReturn(throwable -> new StorageResult(entry, new Exception(throwable)))
.toObservable();
});
}
MainPresenter.java 文件源码
项目:GitHub
阅读 27
收藏 0
点赞 0
评论 0
void registerEvent() {
addSubscribe(RxBus.getDefault().toFlowable(NightModeEvent.class)
.compose(RxUtil.<NightModeEvent>rxSchedulerHelper())
.map(new Function<NightModeEvent, Boolean>() {
@Override
public Boolean apply(NightModeEvent nightModeEvent) {
return nightModeEvent.getNightMode();
}
})
.subscribeWith(new CommonSubscriber<Boolean>(mView, "切换模式失败ヽ(≧Д≦)ノ") {
@Override
public void onNext(Boolean aBoolean) {
mView.useNightMode(aBoolean);
}
})
);
}
RxSwingPluginsTest.java 文件源码
项目:RxJava2Swing
阅读 27
收藏 0
点赞 0
评论 0
@Test
public void onScheduleCrashes() {
RxSwingPlugins.setOnSchedule(new Function<Runnable, Runnable>() {
@Override
public Runnable apply(Runnable r) throws Exception {
throw new IllegalStateException("Failure");
}
});
try {
RxSwingPlugins.onSchedule(Functions.EMPTY_RUNNABLE);
Assert.fail("Should have thrown!");
} catch (IllegalStateException ex) {
Assert.assertEquals("Failure", ex.getMessage());
}
RxSwingPlugins.reset();
Assert.assertSame(Functions.EMPTY_RUNNABLE, RxSwingPlugins.onSchedule(Functions.EMPTY_RUNNABLE));
}
AllocineApi.java 文件源码
项目:Android-Allocine-Api
阅读 33
收藏 0
点赞 0
评论 0
/**
* Informations sur une personne
*/
private Single<News> news(String idNews, String profile) {
final String params = ServiceSecurity.construireParams(false,
AllocineService.CODE, idNews,
AllocineService.PROFILE, profile
);
final String sed = ServiceSecurity.getSED();
final String sig = ServiceSecurity.getSIG(params, sed);
return allocineService.news(idNews, profile, sed, sig)
.map(new Function<AllocineResponse, News>() {
@Override
public News apply(AllocineResponse allocineResponse) throws Exception {
return null;
}
});
}
StoreService.java 文件源码
项目:store2store
阅读 28
收藏 0
点赞 0
评论 0
@Override
public Flowable<Optional<T>> getById(final int id) {
List<Flowable<Optional<T>>> flowables = new ArrayList<>();
Flowable<Optional<T>> flowStorage = dao.getById(id);
if(hasSyncedStore()) {
flowStorage = flowStorage
.flatMap(new Function<Optional<T>, Flowable<Optional<T>>>() {
@Override
public Flowable<Optional<T>> apply(final Optional<T> item) throws Exception {
return syncedStore.insertOrUpdate(item.get());
}
});
flowables.add(syncedStore.getById(id));
}
flowables.add(flowStorage);
return Flowable.concat(flowables);
}
FirstRemoteStrategy.java 文件源码
项目:SuperHttp
阅读 44
收藏 0
点赞 0
评论 0
@Override
public <T> Observable<CacheResult<T>> execute(ApiCache apiCache, String cacheKey, Observable<T> source, Type type) {
Observable<CacheResult<T>> cache = loadCache(apiCache, cacheKey, type);
cache.onErrorReturn(new Function<Throwable, CacheResult<T>>() {
@Override
public CacheResult<T> apply(Throwable throwable) throws Exception {
return null;
}
});
Observable<CacheResult<T>> remote = loadRemote(apiCache, cacheKey, source);
return Observable.concat(remote, cache).filter(new Predicate<CacheResult<T>>() {
@Override
public boolean test(CacheResult<T> tCacheResult) throws Exception {
return tCacheResult != null && tCacheResult.getCacheData() != null;
}
}).firstElement().toObservable();
}
RxAdapterHelper.java 文件源码
项目:MultiTypeRecyclerViewAdapter
阅读 27
收藏 0
点赞 0
评论 0
@Override
protected void startRefresh(HandleBase<MultiHeaderEntity> refreshData) {
Flowable.just(refreshData)
.onBackpressureDrop()
.observeOn(Schedulers.computation())
.map(new Function<HandleBase<MultiHeaderEntity>, DiffUtil.DiffResult>() {
@Override
public DiffUtil.DiffResult apply(@NonNull HandleBase<MultiHeaderEntity> handleBase) throws Exception {
return handleRefresh(handleBase.getNewData(), handleBase.getNewHeader(), handleBase.getNewFooter(), handleBase.getType(), handleBase.getRefreshType());
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<DiffUtil.DiffResult>() {
@Override
public void accept(@NonNull DiffUtil.DiffResult diffResult) throws Exception {
handleResult(diffResult);
}
});
}
AppDataManager.java 文件源码
项目:android-mvp-architecture
阅读 29
收藏 0
点赞 0
评论 0
@Override
public Observable<Boolean> seedDatabaseQuestions() {
GsonBuilder builder = new GsonBuilder().excludeFieldsWithoutExposeAnnotation();
final Gson gson = builder.create();
return mDbHelper.isQuestionEmpty()
.concatMap(new Function<Boolean, ObservableSource<? extends Boolean>>() {
@Override
public ObservableSource<? extends Boolean> apply(Boolean isEmpty)
throws Exception {
if (isEmpty) {
Type type = $Gson$Types
.newParameterizedTypeWithOwner(null, List.class,
Question.class);
List<Question> questionList = gson.fromJson(
CommonUtils.loadJSONFromAsset(mContext,
AppConstants.SEED_DATABASE_QUESTIONS),
type);
return saveQuestionList(questionList);
}
return Observable.just(false);
}
});
}
PixelEffectFragment.java 文件源码
项目:android-study
阅读 29
收藏 0
点赞 0
评论 0
@Override protected void initView(View parent) {
requestBaseInit(getPageTitle());
Observable.just("123")
.subscribeOn(Schedulers.io())
.map(new Function<String, Bitmap>() {
@Override public Bitmap apply(@NonNull String s) throws Exception {
return BitmapFactory.decodeResource(getResources(), R.drawable.test1);
}
})
.observeOn(PausedHandlerScheduler.from(getHandler()))
.compose(mLifecycleProvider.<Bitmap>bindUntilEvent(FragmentEvent.DESTROY))
.subscribe(new Consumer<Bitmap>() {
@Override public void accept(@NonNull Bitmap bitmap) throws Exception {
mBitmap = bitmap;
mImage.setImageBitmap(mBitmap);
}
});
}
AndroidSchedulersTest.java 文件源码
项目:GitHub
阅读 31
收藏 0
点赞 0
评论 0
@Test
public void mainThreadCallsThroughToHook() {
final AtomicInteger called = new AtomicInteger();
final Scheduler newScheduler = new EmptyScheduler();
RxAndroidPlugins.setMainThreadSchedulerHandler(new Function<Scheduler, Scheduler>() {
@Override public Scheduler apply(Scheduler scheduler) {
called.getAndIncrement();
return newScheduler;
}
});
assertSame(newScheduler, AndroidSchedulers.mainThread());
assertEquals(1, called.get());
assertSame(newScheduler, AndroidSchedulers.mainThread());
assertEquals(2, called.get());
}
HandlerSchedulerTest.java 文件源码
项目:GitHub
阅读 27
收藏 0
点赞 0
评论 0
@Test
public void directScheduleOnceUsesHook() {
final CountingRunnable newCounter = new CountingRunnable();
final AtomicReference<Runnable> runnableRef = new AtomicReference<>();
RxJavaPlugins.setScheduleHandler(new Function<Runnable, Runnable>() {
@Override public Runnable apply(Runnable runnable) {
runnableRef.set(runnable);
return newCounter;
}
});
CountingRunnable counter = new CountingRunnable();
scheduler.scheduleDirect(counter);
// Verify our runnable was passed to the schedulers hook.
assertSame(counter, runnableRef.get());
runUiThreadTasks();
// Verify the scheduled runnable was the one returned from the hook.
assertEquals(1, newCounter.get());
assertEquals(0, counter.get());
}
HandlerSchedulerTest.java 文件源码
项目:GitHub
阅读 29
收藏 0
点赞 0
评论 0
@Test
public void directScheduleOnceWithDelayUsesHook() {
final CountingRunnable newCounter = new CountingRunnable();
final AtomicReference<Runnable> runnableRef = new AtomicReference<>();
RxJavaPlugins.setScheduleHandler(new Function<Runnable, Runnable>() {
@Override public Runnable apply(Runnable runnable) {
runnableRef.set(runnable);
return newCounter;
}
});
CountingRunnable counter = new CountingRunnable();
scheduler.scheduleDirect(counter, 1, MINUTES);
// Verify our runnable was passed to the schedulers hook.
assertSame(counter, runnableRef.get());
idleMainLooper(1, MINUTES);
runUiThreadTasks();
// Verify the scheduled runnable was the one returned from the hook.
assertEquals(1, newCounter.get());
assertEquals(0, counter.get());
}
HandlerSchedulerTest.java 文件源码
项目:GitHub
阅读 25
收藏 0
点赞 0
评论 0
@Test
public void workerScheduleOnceWithDelayUsesHook() {
final CountingRunnable newCounter = new CountingRunnable();
final AtomicReference<Runnable> runnableRef = new AtomicReference<>();
RxJavaPlugins.setScheduleHandler(new Function<Runnable, Runnable>() {
@Override public Runnable apply(Runnable runnable) {
runnableRef.set(runnable);
return newCounter;
}
});
Worker worker = scheduler.createWorker();
CountingRunnable counter = new CountingRunnable();
worker.schedule(counter, 1, MINUTES);
// Verify our runnable was passed to the schedulers hook.
assertSame(counter, runnableRef.get());
idleMainLooper(1, MINUTES);
runUiThreadTasks();
// Verify the scheduled runnable was the one returned from the hook.
assertEquals(1, newCounter.get());
assertEquals(0, counter.get());
}
lessonD_AdvancedStreams.java 文件源码
项目:code-examples-android-expert
阅读 31
收藏 0
点赞 0
评论 0
@Test
public void shouldBeNecessaryToSubscribetoStreamAfterSplitting() {
final double[] averages = {0, 0};
Observable<Integer> numbers = Observable.just(22, 22, 99, 22, 101, 22);
Function<Integer, Integer> keySelector = integer -> integer % 2;
Observable<GroupedObservable<Integer, Integer>> split = numbers.groupBy(keySelector);
split.subscribe(
group -> {
Observable<Double> convertToDouble = group.map(integer -> (double) integer);
Function<Double, Double> insertIntoAveragesArray = aDouble -> averages[group.getKey()] = aDouble;
convertToDouble.reduce((t1, t2) -> t1+t2).map(insertIntoAveragesArray).subscribe();
}
);
assertThat(averages[0]).isEqualTo(0);
assertThat(averages[1]).isEqualTo(0);
}
TestStore.java 文件源码
项目:store2store
阅读 31
收藏 0
点赞 0
评论 0
@Override
public Flowable<Optional<List<TestModel>>> getAll(final List<TestModel> items) {
return getAll(null, null).map(new Function<Optional<List<TestModel>>, Optional<List<TestModel>>>() {
@Override
public Optional<List<TestModel>> apply(Optional<List<TestModel>> fullList) throws Exception {
List<TestModel> output = new ArrayList<>();
for(TestModel toFind : items){
for(TestModel tm : fullList.get()){
if(tm.getId() == toFind.getId()){
output.add(tm);
}
}
}
return Optional.wrap(output);
}
});
}
UserModel.java 文件源码
项目:MVPArmsTest1
阅读 40
收藏 0
点赞 0
评论 0
@Override
public Observable<List<User>> getUsers(int lastIdQueried, boolean update) {
Observable<List<User>> users = mRepositoryManager.obtainRetrofitService(UserService.class)
.getUsers(lastIdQueried, USERS_PER_PAGE);
//使用rxcache缓存,上拉刷新则不读取缓存,加载更多读取缓存
return mRepositoryManager.obtainCacheService(CommonCache.class)
.getUsers(users
, new DynamicKey(lastIdQueried)
, new EvictDynamicKey(update))
.flatMap(new Function<Reply<List<User>>, ObservableSource<List<User>>>() {
@Override
public ObservableSource<List<User>> apply(@NonNull Reply<List<User>> listReply) throws Exception {
return Observable.just(listReply.getData());
}
});
}
TicTacToeView.java 文件源码
项目:RIBs
阅读 29
收藏 0
点赞 0
评论 0
@Override
public Observable<BoardCoordinate> squareClicks() {
ArrayList<Observable<BoardCoordinate>> observables = new ArrayList<>();
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 3; j++) {
final int finalI = i;
final int finalJ = j;
observables.add(
RxView.clicks(imageButtons[i][j])
.map(
new Function<Object, BoardCoordinate>() {
@Override
public BoardCoordinate apply(Object irrelevant) throws Exception {
return new BoardCoordinate(finalI, finalJ);
}
}));
}
}
return Observable.merge(observables);
}
DownloadHelper.java 文件源码
项目:GitHub
阅读 29
收藏 0
点赞 0
评论 0
/**
* check url
*
* @param url url
* @return empty
*/
private ObservableSource<Object> checkUrl(final String url) {
return downloadApi.check(url)
.flatMap(new Function<Response<Void>, ObservableSource<Object>>() {
@Override
public ObservableSource<Object> apply(@NonNull Response<Void> resp)
throws Exception {
if (!resp.isSuccessful()) {
return checkUrlByGet(url);
} else {
return saveFileInfo(url, resp);
}
}
})
.compose(retry(REQUEST_RETRY_HINT, maxRetryCount));
}
RetryWithDelay.java 文件源码
项目:XinFramework
阅读 31
收藏 0
点赞 0
评论 0
@Override
public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable
.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {
if (++retryCount <= maxRetries) {
// When this Observable calls onNext, the original Observable will be retried (i.e. re-subscribed).
Log.d("get error, it will try after " + retryDelaySecond
+ " second, retry count " + retryCount);
return Observable.timer(retryDelaySecond,
TimeUnit.SECONDS);
}
// Max retries hit. Just pass the error along.
return Observable.error(throwable);
}
});
}
LocationRepository.java 文件源码
项目:My-Android-Base-Code
阅读 93
收藏 0
点赞 0
评论 0
private Single<Location> getLocation(LocationRequest request) {
if (!shouldRequestNewLocation()) {
return Single.just(mLastLocation);
}
return mFusedLocation.getLocation(request)
.doOnSuccess(new Consumer<Location>() {
@Override
public void accept(Location location) throws Exception {
setLocationCache(location);
}
})
.timeout(LOCATION_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
.onErrorResumeNext(new Function<Throwable, SingleSource<? extends Location>>() {
@Override
public SingleSource<? extends Location> apply(Throwable e) throws Exception {
if (e instanceof TimeoutException && mLastLocation == null) {
return Single.error(new LocationTimeoutException());
} else if (mLastLocation == null) {
return Single.error(e);
} else {
return Single.just(mLastLocation);
}
}
});
}
UserNetworkCalls.java 文件源码
项目:GSB-2017-Android
阅读 24
收藏 0
点赞 0
评论 0
public static Observable<User> loginUser(final String username , String password) {
UserService service = ServiceGenerator.createService(UserService.class);
return service.login(UrlManager.loginURL() , new User(username , password))
.flatMap(new Function<JsonElement, Observable<User>>() {
@Override
public Observable<User> apply(JsonElement jsonElement) throws Exception {
if (jsonElement != null) {
Log.i("Login User" , "JSON: "+jsonElement.toString());
if(jsonElement.isJsonObject()) {
User user = (new Gson()).fromJson(jsonElement.getAsJsonObject() , User.class);
PrefUtils.setUsername(user.getUsrFullname());
PrefUtils.setUserEmail(user.getUsrUsername());
return Observable.just(user);
} else {
return Observable.error(new Exception("Expected a JSON Object"));
}
} else {
return Observable.error(new Exception("Login Failed"));
}
}
}).observeOn(AndroidSchedulers.mainThread());
}
GroupByExampleActivity.java 文件源码
项目:RxJava2-Android-Sample
阅读 26
收藏 0
点赞 0
评论 0
private void doSomeWork() {
Observable.range(0, 8).groupBy(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return integer % 2 == 0 ? "偶数" : "奇数";
}
}).subscribe(new Consumer<GroupedObservable<String, Integer>>() {
@Override
public void accept(@NonNull GroupedObservable<String, Integer> stringIntegerGroupedObservable) throws Exception {
String key = stringIntegerGroupedObservable.getKey();
Log.i(TAG, "accept: key=" + key);
if (key.equals("偶数")) {
stringIntegerGroupedObservable.subscribe(getObserver(key));
} else {
stringIntegerGroupedObservable.subscribe(getObserver(key));
}
}
});
}
IconShowPresenter.java 文件源码
项目:MBEStyle
阅读 36
收藏 0
点赞 0
评论 0
public Disposable getWhatsNewIcons() {
return Observable.fromArray(mView.getResources().getStringArray(R.array.whatsnew))
.map(new Function<String, IconBean>() {
@Override
public IconBean apply(@NonNull String s) throws Exception {
IconBean bean = new IconBean();
bean.id = mView.getResources().getIdentifier(s, "drawable", BuildConfig.APPLICATION_ID);
bean.name = s;
return bean;
}
}).toList().subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<IconBean>>() {
@Override
public void accept(List<IconBean> list) throws Exception {
mView.onLoadData(list);
}
});
}
GuardTest.java 文件源码
项目:J-Chain
阅读 26
收藏 0
点赞 0
评论 0
@Test
public void onErrorMapWithNoErrorThenReturnEmptyOptional() {
String result = Chain.let(0)
.guard(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
// do nothing
}
})
.onErrorMap(new Function<Throwable, String>() {
@Override
public String apply(Throwable throwable) throws Exception {
return "!";
}
})
.defaultIfEmpty("")
.call();
assertEquals("", result);
}
TicTacToeView.java 文件源码
项目:RIBs
阅读 28
收藏 0
点赞 0
评论 0
@Override
public Observable<BoardCoordinate> squareClicks() {
ArrayList<Observable<BoardCoordinate>> observables = new ArrayList<>();
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 3; j++) {
final int finalI = i;
final int finalJ = j;
observables.add(
RxView.clicks(imageButtons[i][j])
.map(
new Function<Object, BoardCoordinate>() {
@Override
public BoardCoordinate apply(Object irrelevant) throws Exception {
return new BoardCoordinate(finalI, finalJ);
}
}));
}
}
return Observable.merge(observables);
}
RxPayUtils.java 文件源码
项目:RxPay
阅读 33
收藏 0
点赞 0
评论 0
public static ObservableTransformer<PayResult, PayResult> checkAliPayResult() {
return new ObservableTransformer<PayResult, PayResult>() {
@Override
public ObservableSource<PayResult> apply(Observable<PayResult> upstream) {
return upstream.map(new Function<PayResult, PayResult>() {
@Override
public PayResult apply(PayResult payResult) throws Exception {
if (!payResult.isSucceed()) {
throw new PayFailedException(payResult.getErrInfo());
}
return payResult;
}
});
}
};
}