/**
* Creates transform operator, which logs defined events in observable's lifecycle
* @param msg message
* @param bitMask bitmask of events which you want to log
* @param <T> type
* @return transformer
*/
public static <T> SingleTransformer<T, T> logSingle(final String msg, final int bitMask) {
return upstream -> {
if ((bitMask & LOG_SUBSCRIBE) > 0) {
upstream = upstream.compose(sLogSubscribe(msg));
}
if ((bitMask & LOG_ERROR) > 0) {
upstream = upstream.compose(sLogError(msg));
}
if ((bitMask & LOG_NEXT_DATA) > 0) {
upstream = upstream.compose(sLogSuccess(msg));
} else if ((bitMask & LOG_NEXT_EVENT) > 0) {
upstream = upstream.compose(sLogSuccessEvent(msg));
}
if ((bitMask & LOG_DISPOSE) > 0) {
upstream = upstream.compose(sLogDispose(msg));
}
return upstream;
};
}
java类io.reactivex.SingleTransformer的实例源码
RxLog.java 文件源码
项目:RxLog
阅读 46
收藏 0
点赞 0
评论 0
RealtimeMessagesProcessor.java 文件源码
项目:Phoenix-for-VK
阅读 40
收藏 0
点赞 0
评论 0
private SingleTransformer<TmpResult, TmpResult> storeToCacheAndReturn() {
return single -> single
// собственно, вставка
.flatMap(result -> this.messagesInteractor
.insertMessages(result.getAccountId(), result.collectDtos())
.andThen(refreshChangedDialogs(result))
.andThen(Single.just(result)))
.flatMap(result -> {
// собственно, получение из локальной базы
List<Integer> ids = collectIds(result.getData(), msg -> true);
return this.messagesInteractor
.findCachedMessages(result.getAccountId(), ids)
.map(result::appendModel);
});
}
WallsImpl.java 文件源码
项目:Phoenix-for-VK
阅读 30
收藏 0
点赞 0
评论 0
private SingleTransformer<List<PostEntity>, List<Post>> dbos2models(int accountId) {
return single -> single
.flatMap(dbos -> {
final VKOwnIds ids = new VKOwnIds();
Entity2Model.fillOwnerIds(ids, dbos);
return ownersInteractor
.findBaseOwnersDataAsBundle(accountId, ids.getAll(), IOwnersInteractor.MODE_ANY)
.map(owners -> {
List<Post> posts = new ArrayList<>(dbos.size());
for (PostEntity dbo : dbos) {
posts.add(Entity2Model.buildPostFromDbo(dbo, owners));
}
return posts;
});
});
}
MessagesInteractor.java 文件源码
项目:Phoenix-for-VK
阅读 40
收藏 0
点赞 0
评论 0
private SingleTransformer<List<MessageEntity>, List<Message>> toMessageModels(int accountId) {
return single -> single
.flatMap(dbos -> {
VKOwnIds ownIds = new VKOwnIds();
Entity2Model.fillOwnerIds(ownIds, dbos);
return this.ownersInteractor
.findBaseOwnersDataAsBundle(accountId, ownIds.getAll(), IOwnersInteractor.MODE_ANY)
.map(owners -> {
final List<Message> messages = new ArrayList<>(dbos.size());
for (MessageEntity dbo : dbos) {
messages.add(Entity2Model.buildMessageFromDbo(accountId, dbo, owners));
}
return messages;
});
});
}
CommentsInteractor.java 文件源码
项目:Phoenix-for-VK
阅读 37
收藏 0
点赞 0
评论 0
private SingleTransformer<List<CommentEntity>, List<Comment>> dbos2models(int accountId) {
return single -> single.flatMap(dbos -> {
VKOwnIds ownids = new VKOwnIds();
for (CommentEntity c : dbos) {
Entity2Model.fillCommentOwnerIds(ownids, c);
}
return ownersInteractor
.findBaseOwnersDataAsBundle(accountId, ownids.getAll(), IOwnersInteractor.MODE_ANY)
.map(owners -> {
List<Comment> comments = new ArrayList<>(dbos.size());
for (CommentEntity dbo : dbos) {
comments.add(Entity2Model.buildCommentFromDbo(dbo, owners));
}
return comments;
});
});
}
AbstractPresenter.java 文件源码
项目:AndroidMVPresenter
阅读 47
收藏 0
点赞 0
评论 0
public <R> SingleTransformer<? super R, ? extends R> composeSingle() {
return new SingleTransformer<R, R>() {
@Override
public SingleSource<R> apply(@NonNull Single<R> upstream) {
return upstream
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.retryWhen(new RetryWithDelay(maxRetry, todoBeforeRetry).forSingle)
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
AbstractPresenter.this.addDisposable(disposable);
}
});
}
};
}
StatusExceptionResumeNextTransformer.java 文件源码
项目:RxGps
阅读 37
收藏 0
点赞 0
评论 0
public static <R extends Result> SingleTransformer<R, R> forSingle() {
return upstream -> upstream.onErrorResumeNext(throwable -> {
if(throwable instanceof StatusException) {
StatusException statusException = (StatusException) throwable;
if(statusException.getStatus().hasResolution()) {
return Single.just((R) statusException.getResult());
} else {
return Single.error(throwable);
}
} else {
return Single.error(throwable);
}
});
}
FilterHelper.java 文件源码
项目:DiscogsBrowser
阅读 40
收藏 0
点赞 0
评论 0
/**
* Filters the given list against whether their Title or Subtitle matches the filter text.
*
* @return Filtered list.
*/
public SingleTransformer<List<? extends RecyclerViewModel>, List<? extends RecyclerViewModel>> filterByFilterText()
{
return untransformed ->
(Single) untransformed.flattenAsObservable(items -> items)
.filter(item ->
item.getSubtitle().toLowerCase().contains(filterText) || item.getTitle().toLowerCase().contains(filterText))
.toList();
}
FilterHelper.java 文件源码
项目:DiscogsBrowser
阅读 44
收藏 0
点赞 0
评论 0
/**
* Filters the list to items that are listed as For Sale.
*
* @return Filtered list.
*/
public SingleTransformer<List<Listing>, List<Listing>> filterForSale()
{
return listingsSingle ->
listingsSingle.flattenAsObservable(listings -> listings)
.filter(listing ->
listing.getStatus().equals("For Sale"))
.toList();
}
AppModule.java 文件源码
项目:redux-observable
阅读 43
收藏 0
点赞 0
评论 0
@Provides
SchedulerSingleTransformer provideSchedulerSingleTransformer() {
return new SchedulerSingleTransformer() {
@SuppressWarnings("unchecked")
@Override public <T> SingleTransformer<T, T> transformer() {
return upstream -> upstream.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
SchedulerSingleTransformer.java 文件源码
项目:redux-observable
阅读 34
收藏 0
点赞 0
评论 0
@SuppressWarnings("unchecked")
@Override public <T> SingleTransformer<T, T> transformer() {
return (SingleTransformer<T, T>) new SingleTransformer() {
@Override public SingleSource apply(Single upstream) {
return upstream.subscribeOn(Schedulers.trampoline())
.observeOn(Schedulers.trampoline());
}
};
}
RealtimeMessagesProcessor.java 文件源码
项目:Phoenix-for-VK
阅读 41
收藏 0
点赞 0
评论 0
private SingleTransformer<TmpResult, TmpResult> getAndStore() {
return single -> single
.flatMap(result -> {
// если в исходных данных недостаточно инфы - получаем нужные данные с api
List<Integer> needGetFromNet = collectIds(result.getData(), msg -> isNull(msg.getDto()));
if (needGetFromNet.isEmpty()) {
return Single.just(result);
}
return networker.vkDefault(result.getAccountId())
.messages()
.getById(needGetFromNet)
.map(result::appendDtos);
})
.map(result -> {
// отсеиваем сообщения, которые имеют отношение к обмену ключами
removeIf(result.getData(), msg -> KeyExchangeService.intercept(app, result.getAccountId(), msg.getDto()));
return result;
})
.flatMap(result -> {
if (result.getData().isEmpty()) {
return Single.just(result);
}
// идентифицируем доолнительные необходимые данные, которых не хватает в локальной базе
// например, информация о пользователях, группах или чатах
// получаем и сохраняем, если необходимо
return identifyMissingObjectsGetAndStore(result)
.andThen(Single.just(result))
// сохраняем сообщения в локальную базу и получаем оттуда "тяжелые" обьекты сообщений
.compose(storeToCacheAndReturn());
});
}
WallsImpl.java 文件源码
项目:Phoenix-for-VK
阅读 33
收藏 0
点赞 0
评论 0
private SingleTransformer<PostEntity, Post> dbo2model(int accountId) {
return single -> single
.flatMap(dbo -> {
final VKOwnIds ids = new VKOwnIds();
Entity2Model.fillPostOwnerIds(ids, dbo);
return ownersInteractor
.findBaseOwnersDataAsBundle(accountId, ids.getAll(), IOwnersInteractor.MODE_ANY)
.map(owners -> {
return Entity2Model.buildPostFromDbo(dbo, owners);
});
});
}
AppModule.java 文件源码
项目:Architecture
阅读 41
收藏 0
点赞 0
评论 0
@Provides
SchedulerSingleTransformer provideSchedulerSingleTransformer() {
return new SchedulerSingleTransformer() {
@SuppressWarnings("unchecked")
@Override public <T> SingleTransformer<T, T> transformer() {
return upstream -> upstream.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
SchedulerSingleTransformer.java 文件源码
项目:Architecture
阅读 39
收藏 0
点赞 0
评论 0
@SuppressWarnings("unchecked")
@Override public <T> SingleTransformer<T, T> transformer() {
return (SingleTransformer<T, T>) new SingleTransformer() {
@Override public SingleSource apply(Single upstream) {
return upstream.subscribeOn(Schedulers.trampoline())
.observeOn(Schedulers.trampoline());
}
};
}
RxUtils.java 文件源码
项目:CleanArchitecture
阅读 47
收藏 0
点赞 0
评论 0
public static <T> SingleTransformer<T, T> applyCommonSchedulersSingle() {
return new SingleTransformer<T, T>() {
@Override
public SingleSource<T> apply(@NonNull Single<T> upstream) {
return upstream.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
AllocineApi.java 文件源码
项目:Android-Allocine-Api
阅读 38
收藏 0
点赞 0
评论 0
private <T> SingleTransformer<T, T> retry() {
return new SingleTransformer<T, T>() {
@Override
public SingleSource<T> apply(Single<T> upstream) {
return upstream.retryWhen(new Function<Flowable<Throwable>, Publisher<Object>>() {
private final int MAX_COUNT = 3;
private int count = 0;
private final int DELAY_SECOND = 10;
@Override
public Publisher<Object> apply(Flowable<Throwable> throwableFlowable) throws Exception {
return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() {
@Override
public Publisher<?> apply(Throwable throwable) throws Exception {
if (count++ < MAX_COUNT && throwable instanceof HttpException) {
final HttpException httpException = (HttpException) throwable;
if (httpException.code() == 403) {
return Flowable.timer(DELAY_SECOND, TimeUnit.SECONDS);
}
}
return Flowable.error(throwable);
}
});
}
});
}
};
}
AbstractPresenter.java 文件源码
项目:Open-Mam
阅读 34
收藏 0
点赞 0
评论 0
public <R> SingleTransformer<? super R, ? extends R> compose() {
return new SingleTransformer<R, R>() {
@Override
public SingleSource<R> apply(@NonNull Single<R> upstream) {
return upstream
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe(AbstractPresenter.this::call);
}
};
}
Controller.java 文件源码
项目:bigbang
阅读 43
收藏 0
点赞 0
评论 0
/**
* Provides the Io schedule {@link Single} transformation.
* Subscribes the stream to Io bound {@link Schedulers} and observes it in the {Android main thread.
*
* @return The stream with the schedule transformation
*/
@CheckResult
@NonNull
protected <T> SingleTransformer<T, T> applySingleIoSchedulers() {
//noinspection unchecked
return (SingleTransformer<T, T>) singleIoTransformer;
}
SingleUseCase.java 文件源码
项目:EasyMVP
阅读 53
收藏 0
点赞 0
评论 0
public SingleUseCase(final UseCaseExecutor useCaseExecutor,
final PostExecutionThread postExecutionThread) {
super(useCaseExecutor, postExecutionThread);
schedulersTransformer = new SingleTransformer<R, R>() {
@Override
public Single<R> apply(Single<R> single) {
return single.subscribeOn(useCaseExecutor.getScheduler())
.observeOn(postExecutionThread.getScheduler());
}
};
}
LifecycleRxJavaBinder.java 文件源码
项目:UseCases
阅读 34
收藏 0
点赞 0
评论 0
public static <T> SingleTransformer<T, T> applySingle(LifecycleOwner lifecycleOwner) {
return single -> {
LiveData<T> liveData = LiveDataReactiveStreams.fromPublisher(single.toFlowable());
return Flowable.fromPublisher(LiveDataReactiveStreams
.toPublisher(lifecycleOwner, liveData)).singleOrError();
};
}
NetworkResponse.java 文件源码
项目:DirtyAndroid
阅读 121
收藏 0
点赞 0
评论 0
public <T> SingleTransformer<Response<T>, T> process() {
return oResponse -> oResponse
.flatMap(response -> {
if (response.isSuccessful()) return Single.just(response.body());
try {
String error = errorAdapter.adapt(response.errorBody().string());
return Single.error(new NetworkException(error));
} catch (java.lang.Exception exception) {
return Single.error(new RuntimeException(exception));
}
});
}
TransformationsBehaviour.java 文件源码
项目:DirtyAndroid
阅读 29
收藏 0
点赞 0
评论 0
public <T> SingleTransformer<T, T> safely() {
return single -> single
.subscribeOn(backgroundThread)
.<T>observeOn(mainThread)
.<T>compose(lifecycle)
.<T>onErrorResumeNext(error -> {
if (error instanceof CancellationException) return Single.never();
return Single.error((Throwable) error);
});
}
TransformationsBehaviour.java 文件源码
项目:DirtyAndroid
阅读 30
收藏 0
点赞 0
评论 0
public <T> SingleTransformer<T, T> reportOnSnackBar() {
return single -> single
.<T>doOnError(throwable -> {
Single<String> formattedError = exceptionFormatter.format(throwable);
notifications.showSnackBar(formattedError);
})
.<T>onErrorResumeNext(error -> Single.never());
}
TransformationsBehaviour.java 文件源码
项目:DirtyAndroid
阅读 31
收藏 0
点赞 0
评论 0
public <T> SingleTransformer<T, T> reportOnToast() {
return single -> single
.<T>doOnError(throwable -> {
Single<String> formattedError = exceptionFormatter.format(throwable);
notifications.showToast(formattedError);
})
.<T>onErrorResumeNext(throwable -> Single.never());
}
LifecycleTransformer2xBehaviour.java 文件源码
项目:RxLifecycleInterop
阅读 39
收藏 0
点赞 0
评论 0
@Override public <U> SingleTransformer<U, U> forSingle() {
return new SingleTransformer<U, U>() {
@Override public SingleSource<U> apply(io.reactivex.Single<U> source) {
rx.Single<U> rxSourceSingle =
RxJavaInterop.toV1Single(source);
rx.Single<T> rxBoundSingle = rxSourceSingle
.compose((Single.Transformer<? super U, ? extends T>) rxSingleTransformer);
return (io.reactivex.Single<U>) RxJavaInterop.toV2Single(rxBoundSingle);
}
};
}
RxUtil.java 文件源码
项目:mobile-buy-sdk-android
阅读 47
收藏 0
点赞 0
评论 0
private static <T> SingleTransformer<Response<T>, T> queryResponseDataTransformer() {
return upstream -> upstream.flatMap(response -> {
if (response.errors().isEmpty()) {
return Single.just(response.data());
} else {
String errorMessage = fold(new StringBuilder(), response.errors(),
(builder, error) -> builder.append(error.message()).append("\n")).toString();
return Single.error(new RuntimeException(errorMessage));
}
});
}
RxUtil.java 文件源码
项目:mobile-buy-sdk-android
阅读 39
收藏 0
点赞 0
评论 0
private static <T extends AbstractResponse<T>> SingleTransformer<GraphResponse<T>, T> queryResponseDataTransformer() {
return upstream -> upstream.flatMap(response -> {
if (response.errors().isEmpty()) {
return Single.just(response.data());
} else {
String errorMessage = fold(new StringBuilder(), response.errors(),
(builder, error) -> builder.append(error.message()).append("\n")).toString();
return Single.error(new RuntimeException(errorMessage));
}
});
}
Ch9_10.java 文件源码
项目:Learning-RxJava
阅读 35
收藏 0
点赞 0
评论 0
public static <T> SingleTransformer<Collection<T>,
Collection<T>> toUnmodifiable() {
return singleObserver ->
singleObserver.map(Collections::unmodifiableCollection);
}
RxLog.java 文件源码
项目:RxLog
阅读 36
收藏 0
点赞 0
评论 0
private static <T> SingleTransformer<T, T> sLogSuccess(final String msg) {
return upstream -> upstream.doOnSuccess(data -> Timber.d("[onSuccess] %s %s [Thread:%s]", msg, data, Thread.currentThread().getName()));
}