public <R> SingleTransformer<? super R, ? extends R> composeSingle() {
return new SingleTransformer<R, R>() {
@Override
public SingleSource<R> apply(@NonNull Single<R> upstream) {
return upstream
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.retryWhen(new RetryWithDelay(maxRetry, todoBeforeRetry).forSingle)
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
AbstractPresenter.this.addDisposable(disposable);
}
});
}
};
}
java类io.reactivex.SingleSource的实例源码
AbstractPresenter.java 文件源码
项目:AndroidMVPresenter
阅读 33
收藏 0
点赞 0
评论 0
LocationRepository.java 文件源码
项目:My-Android-Base-Code
阅读 44
收藏 0
点赞 0
评论 0
private Single<Location> getLocation(LocationRequest request) {
if (!shouldRequestNewLocation()) {
return Single.just(mLastLocation);
}
return mFusedLocation.getLocation(request)
.doOnSuccess(new Consumer<Location>() {
@Override
public void accept(Location location) throws Exception {
setLocationCache(location);
}
})
.timeout(LOCATION_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
.onErrorResumeNext(new Function<Throwable, SingleSource<? extends Location>>() {
@Override
public SingleSource<? extends Location> apply(Throwable e) throws Exception {
if (e instanceof TimeoutException && mLastLocation == null) {
return Single.error(new LocationTimeoutException());
} else if (mLastLocation == null) {
return Single.error(e);
} else {
return Single.just(mLastLocation);
}
}
});
}
ClientCallsRx.java 文件源码
项目:grpc-rx
阅读 28
收藏 0
点赞 0
评论 0
public SingleResponseReceiver(ClientCall<?, RespT> call) {
this.call = call;
this.source = new SingleSource<RespT>() {
@Override
public void subscribe(SingleObserver<? super RespT> observer) {
responseObserver = observer;
// todo which disposable should be used here
observer.onSubscribe(Disposables.disposed());
// start call until response gets subscribed
startCall();
if (error != null) {
responseObserver.onError(error);
error = null;
}
}
};
}
Deck.java 文件源码
项目:Roach
阅读 25
收藏 0
点赞 0
评论 0
@Exclude
private Single<Integer> getCardCount(final String type) {
return Single.defer(new Callable<SingleSource<? extends Integer>>() {
@Override
public SingleSource<? extends Integer> call() throws Exception {
return new Single<Integer>() {
@Override
protected void subscribeActual(SingleObserver<? super Integer> observer) {
int count = 0;
for (String cardId : cardCount.keySet()) {
if (getCards().get(cardId).getType().equals(type)) {
count += cardCount.get(cardId);
}
}
observer.onSuccess(count);
}
};
}
});
}
ArtistReleasesTransformer.java 文件源码
项目:DiscogsBrowser
阅读 26
收藏 0
点赞 0
评论 0
/**
* Applies the filtered text to the artist releases.
*
* @param upstream {@link Single} containing {@link ArtistRelease}s.
* @return Filtered {@link Single} containing {@link ArtistRelease}s.
*/
@Override
public SingleSource<List<ArtistRelease>> apply(@NonNull Single<List<ArtistRelease>> upstream)
{
return upstream.flattenAsObservable(releases ->
releases)
.filter(artistRelease ->
(artistRelease.getTitle().toLowerCase().contains(filterText)) ||
(artistRelease.getYear().toLowerCase().contains(filterText)))
.toList();
}
SchedulerSingleTransformer.java 文件源码
项目:redux-observable
阅读 28
收藏 0
点赞 0
评论 0
@SuppressWarnings("unchecked")
@Override public <T> SingleTransformer<T, T> transformer() {
return (SingleTransformer<T, T>) new SingleTransformer() {
@Override public SingleSource apply(Single upstream) {
return upstream.subscribeOn(Schedulers.trampoline())
.observeOn(Schedulers.trampoline());
}
};
}
SchedulerSingleTransformer.java 文件源码
项目:Architecture
阅读 31
收藏 0
点赞 0
评论 0
@SuppressWarnings("unchecked")
@Override public <T> SingleTransformer<T, T> transformer() {
return (SingleTransformer<T, T>) new SingleTransformer() {
@Override public SingleSource apply(Single upstream) {
return upstream.subscribeOn(Schedulers.trampoline())
.observeOn(Schedulers.trampoline());
}
};
}
DataManager.java 文件源码
项目:REDAndroid
阅读 29
收藏 0
点赞 0
评论 0
public Single<List<Object>> getCategories() {
return mApiService.forumCategories().flatMap(new Function<ForumCategory, SingleSource<? extends List<Object>>>() {
@Override public SingleSource<? extends List<Object>> apply(ForumCategory forumCategory) {
ArrayList<Object> items = new ArrayList<>();
for (ForumCategory.Categories categories : forumCategory.response.categories) {
items.add(categories.categoryName);
for (ForumCategory.Forums forum : categories.forums) {
items.add(forum);
}
}
return Single.just(items);
}
});
}
LifecycleTransformer.java 文件源码
项目:LifecycleAwareRx
阅读 27
收藏 0
点赞 0
评论 0
/**
* NOTE: This throws a NoSuchElementException if the item is filtered out since a Single can't be empty, so the
* onError is called after onDestroy() when using Single().
*
* @param upstream
* @return
*/
@Override
public SingleSource<T> apply(Single<T> upstream) {
Single<T> transformedStream = upstream
.cache() // Cache to replay emitted values to late subscriber
.filter(filterIfDestroyedPredicate) // Filter to stop emitting items once LifecycleOwner is destroyed
.toSingle();
setReactiveType((R)transformedStream);
return transformedStream;
}
SearchActivity.java 文件源码
项目:YelpQL
阅读 35
收藏 0
点赞 0
评论 0
private void loadData(final String searchTerm, final Location location, final int offsetValue) {
progressbar.setVisibility(View.VISIBLE);
// get the authentication token and find results
AuthenticationTokenUtil.fetchAndUpdateAuthenticationToken(this)
.flatMap(new Function<String, SingleSource<List<Business>>>() {
@Override
public SingleSource<List<Business>> apply(@NonNull String authenticationToken) throws Exception {
return SearchAPI.searchYelp(authenticationToken, searchTerm, location.getLatitude(), location.getLongitude(), offsetValue);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<Business>>() {
@Override
public void accept(@NonNull List<Business> businessList) throws Exception {
progressbar.setVisibility(View.GONE);
if (offsetValue == 0) {
searchAdapter = new SearchAdapter(businessList, SearchActivity.this, SearchActivity.this);
rvNearbyRestaurant.setAdapter(searchAdapter);
} else {
searchAdapter.addBusinessList(businessList);
searchAdapter.notifyDataSetChanged();
}
}
},
new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
progressbar.setVisibility(View.GONE);
Toast.makeText(SearchActivity.this, "Error " + throwable.getMessage(), Toast.LENGTH_LONG).show();
}
});
}
BusinessDetailsActivity.java 文件源码
项目:YelpQL
阅读 35
收藏 0
点赞 0
评论 0
private void loadRestaurantData(final String businessID, final double latitude, final double longitude) {
progressbar.setVisibility(View.VISIBLE);
AuthenticationTokenUtil.fetchAndUpdateAuthenticationToken(this)
.flatMap(new Function<String, SingleSource<Business>>() {
@Override
public SingleSource<Business> apply(@io.reactivex.annotations.NonNull String authToken) throws Exception {
return BusinessDetailsAPI.getBusinessDetails(authToken, businessID, latitude, longitude);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Business>() {
@Override
public void accept(@io.reactivex.annotations.NonNull Business business) throws Exception {
progressbar.setVisibility(View.GONE);
bindData(business);
}
}
, new Consumer<Throwable>() {
@Override
public void accept(@io.reactivex.annotations.NonNull Throwable throwable) throws Exception {
progressbar.setVisibility(View.GONE);
Toast.makeText(BusinessDetailsActivity.this, throwable.getMessage(), Toast.LENGTH_LONG).show();
}
});
}
RxUtils.java 文件源码
项目:CleanArchitecture
阅读 27
收藏 0
点赞 0
评论 0
public static <T> SingleTransformer<T, T> applyCommonSchedulersSingle() {
return new SingleTransformer<T, T>() {
@Override
public SingleSource<T> apply(@NonNull Single<T> upstream) {
return upstream.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
RxTask.java 文件源码
项目:rxtasks
阅读 34
收藏 0
点赞 0
评论 0
/**
* @param callable
* @param <R>
* @return
*/
@CheckReturnValue
@NonNull
public static <R> Single<R> single(@NonNull final Callable<Task<R>> callable) {
return Single.fromCallable(callable).flatMap(new Function<Task<R>,
SingleSource<? extends R>>() {
@Override
public SingleSource<? extends R> apply(Task<R> task) throws Exception {
return single(task);
}
});
}
LocationDataSource.java 文件源码
项目:black-mirror
阅读 28
收藏 0
点赞 0
评论 0
/**
* Zwraca strefę czasową na podstawie podanej lokalizacji.
@param location Lokalizacja - miasto, kraj, wieś.
*/
@Override
public Single<TimeZone> getTimeZoneByLocationName(String location) {
return googleGeoApi.getCoordForLocation(location, GOOGLE_GEO_API_KEY)
.flatMap(new Function<CoordResponse, SingleSource<? extends TimeZone>>() {
@Override
public SingleSource<? extends TimeZone> apply(@NonNull CoordResponse coordResponse) throws Exception {
String lat = coordResponse.results.get(0).geometry.location.lat.toString();
String lng = coordResponse.results.get(0).geometry.location.lng.toString();
return timeZoneDbApi.getTimeZone(lat, lng, TIME_ZONE_DB_API_KEY);
}
});
}
SingleTransformerOfClazz.java 文件源码
项目:RxFirebase2
阅读 24
收藏 0
点赞 0
评论 0
@Override public SingleSource<Optional<T>> apply(Single<DataSnapshot> upstream) {
return upstream.map(new Function<DataSnapshot, Optional<T>>() {
@Override public Optional<T> apply(DataSnapshot dataSnapshot) throws Exception {
return Optional.of(dataSnapshot.getValue(clazz));
}
});
}
SingleTransformerOfGenericTypeIndicator.java 文件源码
项目:RxFirebase2
阅读 25
收藏 0
点赞 0
评论 0
@Override public SingleSource<Optional<T>> apply(Single<DataSnapshot> upstream) {
return upstream.map(new Function<DataSnapshot, Optional<T>>() {
@Override public Optional<T> apply(DataSnapshot dataSnapshot) throws Exception {
return Optional.of(dataSnapshot.getValue(typeIndicator));
}
});
}
AllocineApi.java 文件源码
项目:Android-Allocine-Api
阅读 34
收藏 0
点赞 0
评论 0
/**
* Recherche
*/
public Single<AllocineResponse> search(final String recherche, final List<String> filter, final int count, final int page) {
return Single
.create(new SingleOnSubscribe<Pair<String, String>>() {
@Override
public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
final String params = ServiceSecurity.construireParams(false,
AllocineService.Q, "" + recherche.replace(" ", "+"),
AllocineService.FILTER, filter,
AllocineService.COUNT, "" + count,
AllocineService.PAGE, "" + page
);
final String sed = ServiceSecurity.getSED();
final String sig = ServiceSecurity.getSIG(params, sed);
e.onSuccess(Pair.create(sed, sig));
}
})
.flatMap(new Function<Pair<String, String>, SingleSource<? extends AllocineResponse>>() {
@Override
public SingleSource<? extends AllocineResponse> apply(Pair<String, String> pair) throws Exception {
return allocineService.search(recherche, ServiceSecurity.applatir(filter), count, page, pair.first, pair.second);
}
})
.compose(this.<AllocineResponse>retry());
}
AllocineApi.java 文件源码
项目:Android-Allocine-Api
阅读 33
收藏 0
点赞 0
评论 0
/**
* Recherche
*/
public Single<AllocineResponseSmall> searchSmall(final String recherche, final List<String> filter, final int count, final int page) {
return Single
.create(new SingleOnSubscribe<Pair<String, String>>() {
@Override
public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
final String params = ServiceSecurity.construireParams(false,
AllocineService.Q, "" + recherche.replace(" ", "+"),
AllocineService.FILTER, filter,
AllocineService.COUNT, "" + count,
AllocineService.PAGE, "" + page
);
final String sed = ServiceSecurity.getSED();
final String sig = ServiceSecurity.getSIG(params, sed);
e.onSuccess(Pair.create(sed, sig));
}
})
.flatMap(new Function<Pair<String, String>, SingleSource<AllocineResponseSmall>>() {
@Override
public SingleSource<AllocineResponseSmall> apply(Pair<String, String> pair) throws Exception {
return allocineService.searchSmall(recherche, ServiceSecurity.applatir(filter), count, page, pair.first, pair.second);
}
})
.compose(this.<AllocineResponseSmall>retry());
}
AllocineApi.java 文件源码
项目:Android-Allocine-Api
阅读 31
收藏 0
点赞 0
评论 0
/**
* Informations sur un film
*/
public Single<Movie> movie(final String idFilm, final Profile profile) {
final String filter = FILTER_MOVIE;
return Single
.create(new SingleOnSubscribe<Pair<String, String>>() {
@Override
public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
final String params = ServiceSecurity.construireParams(false,
AllocineService.CODE, idFilm,
AllocineService.PROFILE, profile.getValue(),
AllocineService.FILTER, filter
);
final String sed = ServiceSecurity.getSED();
final String sig = ServiceSecurity.getSIG(params, sed);
e.onSuccess(Pair.create(sed, sig));
}
})
.flatMap(new Function<Pair<String, String>, SingleSource<? extends Movie>>() {
@Override
public SingleSource<? extends Movie> apply(Pair<String, String> pair) throws Exception {
return allocineService.movie(idFilm, profile.getValue(), filter, pair.first, pair.second)
.map(new Function<AllocineResponse, Movie>() {
@Override
public Movie apply(AllocineResponse allocineResponse) throws Exception {
return allocineResponse.getMovie();
}
});
}
})
.compose(this.<Movie>retry());
}
AllocineApi.java 文件源码
项目:Android-Allocine-Api
阅读 35
收藏 0
点赞 0
评论 0
/**
* Informations sur un film
*/
public Single<Theater> theater(final String idCinema, final String profile, final String filter) {
return Single
.create(new SingleOnSubscribe<Pair<String, String>>() {
@Override
public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
final String params = ServiceSecurity.construireParams(false,
AllocineService.CODE, idCinema,
AllocineService.PROFILE, profile,
AllocineService.FILTER, filter
);
final String sed = ServiceSecurity.getSED();
final String sig = ServiceSecurity.getSIG(params, sed);
e.onSuccess(Pair.create(sed, sig));
}
})
.flatMap(new Function<Pair<String, String>, SingleSource<? extends Theater>>() {
@Override
public SingleSource<? extends Theater> apply(Pair<String, String> pair) throws Exception {
return allocineService.theater(idCinema, profile, filter, pair.first, pair.second)
.map(new Function<AllocineResponse, Theater>() {
@Override
public Theater apply(AllocineResponse allocineResponse) throws Exception {
return allocineResponse.getTheater();
}
});
}
})
.compose(this.<Theater>retry());
}
AllocineApi.java 文件源码
项目:Android-Allocine-Api
阅读 26
收藏 0
点赞 0
评论 0
/**
* Informations sur une personne
*/
public Single<PersonFull> person(final String idPerson, final String profile, final String filter) {
return Single
.create(new SingleOnSubscribe<Pair<String, String>>() {
@Override
public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
final String params = ServiceSecurity.construireParams(false,
AllocineService.CODE, idPerson,
AllocineService.PROFILE, profile,
AllocineService.FILTER, filter
);
final String sed = ServiceSecurity.getSED();
final String sig = ServiceSecurity.getSIG(params, sed);
e.onSuccess(Pair.create(sed, sig));
}
})
.flatMap(new Function<Pair<String, String>, SingleSource<PersonFull>>() {
@Override
public SingleSource<PersonFull> apply(Pair<String, String> pair) throws Exception {
return allocineService.person(idPerson, profile, filter, pair.first, pair.second)
.map(new Function<AllocineResponse, PersonFull>() {
@Override
public PersonFull apply(AllocineResponse allocineResponse) throws Exception {
return allocineResponse.getPerson();
}
});
}
})
.compose(this.<PersonFull>retry());
}
AllocineApi.java 文件源码
项目:Android-Allocine-Api
阅读 31
收藏 0
点赞 0
评论 0
/**
* Filmographie d'une personne
*/
public Single<List<Participation>> filmography(final String idPerson, final String profile, final String filter) {
return Single
.create(new SingleOnSubscribe<Pair<String, String>>() {
@Override
public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
final String params = ServiceSecurity.construireParams(false,
AllocineService.CODE, idPerson,
AllocineService.PROFILE, profile,
AllocineService.FILTER, filter
);
final String sed = ServiceSecurity.getSED();
final String sig = ServiceSecurity.getSIG(params, sed);
e.onSuccess(Pair.create(sed, sig));
}
})
.flatMap(new Function<Pair<String, String>, SingleSource<List<Participation>>>() {
@Override
public SingleSource<List<Participation>> apply(Pair<String, String> pair) throws Exception {
return allocineService.filmography(idPerson, profile, filter, pair.first, pair.second)
.map(new Function<AllocineResponse, List<Participation>>() {
@Override
public List<Participation> apply(AllocineResponse allocineResponse) throws Exception {
return allocineResponse.getPerson().getParticipation();
}
});
}
})
.compose(this.<List<Participation>>retry());
}
AllocineApi.java 文件源码
项目:Android-Allocine-Api
阅读 31
收藏 0
点赞 0
评论 0
public Single<List<Movie>> movieList(List<MovieListFilter> filter, final Profile profile, final MovieListOrder order, final int count, final int page) {
final List<String> filterString = new ArrayList<>();
for (MovieListFilter movieListFilter : filter) {
filterString.add(movieListFilter.getValue());
}
return Single
.create(new SingleOnSubscribe<Pair<String, String>>() {
@Override
public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
final String params = ServiceSecurity.construireParams(true,
AllocineService.FILTER, filterString,
AllocineService.PROFILE, profile.getValue(),
AllocineService.ORDER, order.getValue(),
AllocineService.COUNT, "" + count,
AllocineService.PAGE, "" + page
);
final String sed = ServiceSecurity.getSED();
final String sig = ServiceSecurity.getSIG(params, sed);
e.onSuccess(Pair.create(sed, sig));
}
})
.flatMap(new Function<Pair<String, String>, SingleSource<? extends List<Movie>>>() {
@Override
public SingleSource<? extends List<Movie>> apply(Pair<String, String> pair) throws Exception {
return allocineService.movieList(ServiceSecurity.applatir(filterString), profile.getValue(), order.getValue(), count, page, pair.first, pair.second)
.map(new Function<AllocineResponse, List<Movie>>() {
@Override
public List<Movie> apply(AllocineResponse allocineResponse) throws Exception {
return allocineResponse.getFeed().getMovie();
}
});
}
})
.compose(this.<List<Movie>>retry());
}
AllocineApi.java 文件源码
项目:Android-Allocine-Api
阅读 37
收藏 0
点赞 0
评论 0
public Single<List<PersonFull>> starsList(final List<PersonListFilter> filter, final Profile profile, final int count, final int page) {
final List<String> filterString = new ArrayList<>();
for (PersonListFilter movieListFilter : filter) {
filterString.add(movieListFilter.getValue());
}
return Single
.create(new SingleOnSubscribe<Pair<String, String>>() {
@Override
public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
final String params = ServiceSecurity.construireParams(true,
AllocineService.FILTER, filterString,
AllocineService.PROFILE, profile.getValue(),
AllocineService.COUNT, "" + count,
AllocineService.PAGE, "" + page
);
final String sed = ServiceSecurity.getSED();
final String sig = ServiceSecurity.getSIG(params, sed);
e.onSuccess(Pair.create(sed, sig));
}
})
.flatMap(new Function<Pair<String, String>, SingleSource<? extends List<PersonFull>>>() {
@Override
public SingleSource<? extends List<PersonFull>> apply(Pair<String, String> pair) throws Exception {
return allocineService.personList(ServiceSecurity.applatir(filterString), profile.getValue(), count, page, pair.first, pair.second)
.map(new Function<AllocineResponse, List<PersonFull>>() {
@Override
public List<PersonFull> apply(AllocineResponse allocineResponse) throws Exception {
return allocineResponse.getFeed().getPerson();
}
});
}
})
.compose(this.<List<PersonFull>>retry());
}
AllocineApi.java 文件源码
项目:Android-Allocine-Api
阅读 34
收藏 0
点赞 0
评论 0
public Single<List<Theater>> theaterList(final String zip, final int count, final int page) {
return Single
.create(new SingleOnSubscribe<Pair<String, String>>() {
@Override
public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
final String params = ServiceSecurity.construireParams(false,
AllocineService.ZIP, zip,
AllocineService.COUNT, "" + count,
AllocineService.PAGE, "" + page
);
final String sed = ServiceSecurity.getSED();
final String sig = ServiceSecurity.getSIG(params, sed);
e.onSuccess(Pair.create(sed, sig));
}
})
.flatMap(new Function<Pair<String, String>, SingleSource<? extends List<Theater>>>() {
@Override
public SingleSource<? extends List<Theater>> apply(Pair<String, String> pair) throws Exception {
return allocineService.theaterlist(zip, count, page, pair.first, pair.second)
.map(new Function<AllocineResponse, List<Theater>>() {
@Override
public List<Theater> apply(AllocineResponse allocineResponse) throws Exception {
return allocineResponse.getFeed().getTheater();
}
});
}
})
.compose(this.<List<Theater>>retry());
}
AllocineApi.java 文件源码
项目:Android-Allocine-Api
阅读 33
收藏 0
点赞 0
评论 0
public Single<List<Theater>> theaterList(final String lat, final String lng, final int radius, final int count, final int page) {
return Single
.create(new SingleOnSubscribe<Pair<String, String>>() {
@Override
public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
final String params = ServiceSecurity.construireParams(false,
AllocineService.LAT, lat,
AllocineService.LONG, lng,
AllocineService.RADIUS, "" + radius,
AllocineService.COUNT, "" + count,
AllocineService.PAGE, "" + page
);
final String sed = ServiceSecurity.getSED();
final String sig = ServiceSecurity.getSIG(params, sed);
e.onSuccess(Pair.create(sed, sig));
}
})
.flatMap(new Function<Pair<String, String>, SingleSource<? extends List<Theater>>>() {
@Override
public SingleSource<? extends List<Theater>> apply(Pair<String, String> pair) throws Exception {
return allocineService.theaterlist(lat, lng, radius, count, page, pair.first, pair.second)
.map(new Function<AllocineResponse, List<Theater>>() {
@Override
public List<Theater> apply(AllocineResponse allocineResponse) throws Exception {
return allocineResponse.getFeed().getTheater();
}
});
}
})
.compose(this.<List<Theater>>retry());
}
AllocineApi.java 文件源码
项目:Android-Allocine-Api
阅读 32
收藏 0
点赞 0
评论 0
public Single<List<Media>> videoList(final String code, final int count) {
final String subject = "movie:" + code;
final String mediafmt = "mp4";
return Single
.create(new SingleOnSubscribe<Pair<String, String>>() {
@Override
public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
final String params = ServiceSecurity.construireParams(false,
AllocineService.SUBJECT, subject,
AllocineService.COUNT, "" + count,
AllocineService.MEDIAFMT, mediafmt
);
final String sed = ServiceSecurity.getSED();
final String sig = ServiceSecurity.getSIG(params, sed);
e.onSuccess(Pair.create(sed, sig));
}
})
.flatMap(new Function<Pair<String, String>, SingleSource<List<Media>>>() {
@Override
public SingleSource<List<Media>> apply(Pair<String, String> pair) throws Exception {
return allocineService.videoList(subject, count, mediafmt, pair.first, pair.second)
.map(new Function<AllocineResponse, List<Media>>() {
@Override
public List<Media> apply(AllocineResponse allocineResponse) throws Exception {
return allocineResponse.getFeed().getMedia();
}
});
}
})
.compose(this.<List<Media>>retry());
}
AllocineApi.java 文件源码
项目:Android-Allocine-Api
阅读 30
收藏 0
点赞 0
评论 0
private <T> SingleTransformer<T, T> retry() {
return new SingleTransformer<T, T>() {
@Override
public SingleSource<T> apply(Single<T> upstream) {
return upstream.retryWhen(new Function<Flowable<Throwable>, Publisher<Object>>() {
private final int MAX_COUNT = 3;
private int count = 0;
private final int DELAY_SECOND = 10;
@Override
public Publisher<Object> apply(Flowable<Throwable> throwableFlowable) throws Exception {
return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() {
@Override
public Publisher<?> apply(Throwable throwable) throws Exception {
if (count++ < MAX_COUNT && throwable instanceof HttpException) {
final HttpException httpException = (HttpException) throwable;
if (httpException.code() == 403) {
return Flowable.timer(DELAY_SECOND, TimeUnit.SECONDS);
}
}
return Flowable.error(throwable);
}
});
}
});
}
};
}
AbstractPresenter.java 文件源码
项目:Open-Mam
阅读 29
收藏 0
点赞 0
评论 0
public <R> SingleTransformer<? super R, ? extends R> compose() {
return new SingleTransformer<R, R>() {
@Override
public SingleSource<R> apply(@NonNull Single<R> upstream) {
return upstream
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe(AbstractPresenter.this::call);
}
};
}
MainObserverTransformer.java 文件源码
项目:pandroid
阅读 26
收藏 0
点赞 0
评论 0
@Override
public SingleSource<T> apply(Single<T> upstream) {
Single<T> tObservable = upstream
.observeOn(AndroidSchedulers.mainThread());
if (provider == null) {
return tObservable;
}
return tObservable.compose(RxLifecycleDelegate.<T>bindLifecycle(provider));
}