private void searchData(String tag) {
mHintText.setText("");
mTagLayout.setVisibility(View.GONE);
mProgressBar.setVisibility(View.VISIBLE);
mSearchTag = tag;
Observable<GetDataBean> observable = mSearchApi.searchTagData(mSearchTag);
observable
.filter(new Predicate<GetDataBean>() {
@Override
public boolean test(@NonNull GetDataBean getDataBean) throws Exception {
return getDataBean != null;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(mObserver);
}
java类io.reactivex.Observable的实例源码
SearchActivity.java 文件源码
项目:MyEyepetizer
阅读 30
收藏 0
点赞 0
评论 0
AuthService.java 文件源码
项目:Ghost-Android
阅读 29
收藏 0
点赞 0
评论 0
private Observable<JsonElement> revokeToken(AuthToken token, String clientSecret) {
// this complexity exists because the access token must be revoked AFTER the refresh token
// why? because the access token is needed for both revocations!
Subject<JsonElement> responses = PublishSubject.create();
RevokeReqBody refreshReqBody = RevokeReqBody.fromRefreshToken(
token.getRefreshToken(), clientSecret);
revokeSingleToken(token.getAuthHeader(), refreshReqBody, responses)
.doOnComplete(() -> {
RevokeReqBody accessReqBody = RevokeReqBody.fromAccessToken(
token.getAccessToken(), clientSecret);
revokeSingleToken(token.getAuthHeader(), accessReqBody, responses)
.subscribe();
})
.subscribe();
return responses;
}
Ch4_3.java 文件源码
项目:Learning-RxJava
阅读 48
收藏 0
点赞 0
评论 0
public static void main(String[] args) {
Observable<String> source1 =
Observable.just("Alpha", "Beta");
Observable<String> source2 =
Observable.just("Gamma", "Delta");
Observable<String> source3 =
Observable.just("Epsilon", "Zeta");
Observable<String> source4 =
Observable.just("Eta", "Theta");
Observable<String> source5 =
Observable.just("Iota", "Kappa");
List<Observable<String>> sources =
Arrays.asList(source1, source2, source3, source4,
source5);
Observable.merge(sources)
.subscribe(i -> System.out.println("RECEIVED: " + i));
}
ProductDetailsPresenter.java 文件源码
项目:GitHub
阅读 23
收藏 0
点赞 0
评论 0
@Override protected void bindIntents() {
intent(ProductDetailsView::addToShoppingCartIntent)
.doOnNext(product -> Timber.d("intent: add to shopping cart %s", product))
.flatMap(product -> interactor.addToShoppingCart(product).toObservable()).subscribe();
intent(ProductDetailsView::removeFromShoppingCartIntent)
.doOnNext(product -> Timber.d("intent: remove from shopping cart %s", product))
.flatMap(product -> interactor.removeFromShoppingCart(product).toObservable())
.subscribe();
Observable<ProductDetailsViewState> loadDetails =
intent(ProductDetailsView::loadDetailsIntent)
.doOnNext(productId -> Timber.d("intent: load details for product id = %s", productId))
.flatMap(interactor::getDetails)
.observeOn(AndroidSchedulers.mainThread());
subscribeViewState(loadDetails, ProductDetailsView::render);
}
Modern_Testing.java 文件源码
项目:Reactive-Programming-With-Java-9
阅读 26
收藏 0
点赞 0
评论 0
@Test
public void test_just_new() {
Observable<Integer> observable = Observable.just(12, 34, 6);
TestObserver<Integer> testObserver = new TestObserver<>();
observable.subscribe(testObserver);
List<Integer> list = new ArrayList();
testObserver.assertComplete();
testObserver.assertResult(12, 34, 6);
testObserver.assertValueCount(3);
testObserver.assertNoErrors();
testObserver.assertValueAt(2, (value) -> {
// TODO Auto-generated method stub
return value == 34;
});
}
Modern_Testing.java 文件源码
项目:Reactive-Programming-With-Java-9
阅读 23
收藏 0
点赞 0
评论 0
@Test
public void test_interval()
{
TestScheduler testScheduler=new TestScheduler();
Observable<Long>observable=Observable.interval(1, TimeUnit.SECONDS,testScheduler).take(5);
TestObserver<Long> testObserver=new TestObserver<>();
observable.subscribeOn(testScheduler).subscribe(testObserver);
testObserver.assertNoValues();
testObserver.assertNotComplete();
testObserver.assertNoErrors();
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
testObserver.assertValueCount(1);
testObserver.assertValues(0l);
testScheduler.advanceTimeTo(6, TimeUnit.SECONDS);
testObserver.assertValueCount(5);
testObserver.assertValues(0l,1l,2l,3l,4l);
}
RxQuery.java 文件源码
项目:ObjectBoxRxJava
阅读 34
收藏 0
点赞 0
评论 0
/**
* The returned Observable emits Query results as Lists.
* Never completes, so you will get updates when underlying data changes.
*/
public static <T> Observable<List<T>> observable(final Query<T> query) {
return Observable.create(new ObservableOnSubscribe<List<T>>() {
@Override
public void subscribe(final ObservableEmitter<List<T>> emitter) throws Exception {
final DataSubscription dataSubscription = query.subscribe().observer(new DataObserver<List<T>>() {
@Override
public void onData(List<T> data) {
if (!emitter.isDisposed()) {
emitter.onNext(data);
}
}
});
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
dataSubscription.cancel();
}
});
}
});
}
CommonTasks.java 文件源码
项目:buckaroo
阅读 31
收藏 0
点赞 0
评论 0
public static Observable<Event> downloadRemoteArchive(final FileSystem fs, final RemoteArchive remoteArchive, final Path targetDirectory) {
Preconditions.checkNotNull(fs);
Preconditions.checkNotNull(remoteArchive);
Preconditions.checkNotNull(targetDirectory);
final Path zipFilePath = targetDirectory.getParent().resolve(targetDirectory.getFileName() + ".zip");
return Observable.concat(
// Download the file
CommonTasks.downloadRemoteFile(fs, remoteArchive.asRemoteFile(), zipFilePath),
// Unpack the zip
MoreCompletables.fromRunnable(() -> {
EvenMoreFiles.unzip(
zipFilePath,
targetDirectory,
remoteArchive.subPath,
StandardCopyOption.REPLACE_EXISTING);
}).toObservable()).subscribeOn(Schedulers.io());
}
ModuleCall.java 文件源码
项目:android-arch-mvvm
阅读 26
收藏 0
点赞 0
评论 0
public void enqueue(final ModuleCallback<T> callback) {
synchronized (this) {
if (mExecuted) {
throw new IllegalStateException("每个ModuleCall只能enqueue一次");
}
mExecuted = true;
}
if (mCanceled || mDone) {
return;
}
mModuleCallback = callback;
if (mObservable instanceof Observable) {
subscribeObservable((Observable<T>) mObservable);
} else if (mObservable instanceof Single) {
subscribeSingle((Single<T>) mObservable);
} else if (mObservable instanceof Flowable) {
subscribeFlowable((Flowable<T>) mObservable);
} else {
subscribeMaybe((Maybe<T>) mObservable);
}
}
MainActivity.java 文件源码
项目:rx-progress-dialog
阅读 26
收藏 0
点赞 0
评论 0
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
mLoginObservable = Observable
.timer(TIME_DELAY, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread())
.map(aLong -> "User id is " + UUID.randomUUID().toString());
mLoginFlowable = Flowable
.timer(TIME_DELAY, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread())
.map(aLong -> "User id is " + UUID.randomUUID().toString());
mCompositeDisposable = new CompositeDisposable();
findViewById(R.id.button_observable).setOnClickListener(this);
findViewById(R.id.button_flowable).setOnClickListener(this);
}
MainActivity.java 文件源码
项目:RxEasyHttp
阅读 25
收藏 0
点赞 0
评论 0
/**
* 使用EasyHttp调用自定义api 注意:如果有签名的注意路径有"/"的情况如下
* https://www.xxx.com/v1/account/login (正确)
* https://www.xxx.com//v1/account/login (错误 可能会导致签名失败)
*/
public void onCustomCall(View view) {
final String name = "18688994275";
final String pass = "123456";
final CustomRequest request = EasyHttp.custom().addConverterFactory(GsonConverterFactory.create(new Gson()))
.sign(true)
.timeStamp(true)
.params(ComParamContact.Login.ACCOUNT, name)
.params(ComParamContact.Login.PASSWORD, MD5.encrypt4login(pass, AppConstant.APP_SECRET))
.build();
LoginService mLoginService = request.create(LoginService.class);
Observable<ApiResult<AuthModel>> observable = request.call(mLoginService.login("v1/account/login", request.getParams().urlParamsMap));
Disposable disposable = observable.subscribe(new Consumer<ApiResult<AuthModel>>() {
@Override
public void accept(@NonNull ApiResult<AuthModel> result) throws Exception {
showToast(result.toString());
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
showToast(throwable.getMessage());
}
});
//EasyHttp.cancelSubscription(disposable);//取消订阅
}
ClientsNetworkCalls.java 文件源码
项目:GSB-2017-Android
阅读 23
收藏 0
点赞 0
评论 0
public static Observable<List<Client>> getAllClients() {
ClientsService service = ServiceGenerator.createService(ClientsService.class);
return service.getAllClients(UrlManager.getAllClientsURL())
.flatMap(new Function<JsonElement, Observable<List<Client>>>() {
@Override
public Observable<List<Client>> apply(JsonElement jsonElement) throws Exception {
if(jsonElement != null) {
Log.i("Get All Clients" , "JSON: "+jsonElement.toString());
if(jsonElement.isJsonArray()) {
List<Client> clients = Client.ClientsListParser.fromJsonArray(jsonElement.getAsJsonArray());
return Observable.just(clients);
} else {
return Observable.error(new Exception("Expected a JSON Array"));
}
} else {
return Observable.just((List<Client>) new ArrayList<Client>());
}
}
}).observeOn(AndroidSchedulers.mainThread());
}
LifecycleTest.java 文件源码
项目:LifecycleAwareRx
阅读 34
收藏 0
点赞 0
评论 0
@Test
public void viewsAreCalledBeforeLifecycleIsReadyWithoutLifecycleAwareRx() throws Exception {
// Lifecycle is "active" once it is STARTED, it's not ready yet at INITIALIZED or CREATED.
lifecycleOwner.handleLifecycleEvent(Lifecycle.Event.ON_CREATE);
Observable.interval(1, TimeUnit.MILLISECONDS)
.subscribeWith(new DisposableObserver<Long>() {
@Override
public void onNext(final Long value) {
LifecycleTest.this.methodOnViewCalled = true;
}
@Override
public void onError(final Throwable e) {
}
@Override
public void onComplete() {
}
});
// Need to wait to give it time to potentially fail
TimeUnit.MILLISECONDS.sleep(100);
assertEquals(true, methodOnViewCalled);
}
IconShowPresenter.java 文件源码
项目:MBEStyle
阅读 30
收藏 0
点赞 0
评论 0
public Disposable getWhatsNewIcons() {
return Observable.fromArray(mView.getResources().getStringArray(R.array.whatsnew))
.map(new Function<String, IconBean>() {
@Override
public IconBean apply(@NonNull String s) throws Exception {
IconBean bean = new IconBean();
bean.id = mView.getResources().getIdentifier(s, "drawable", BuildConfig.APPLICATION_ID);
bean.name = s;
return bean;
}
}).toList().subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<IconBean>>() {
@Override
public void accept(List<IconBean> list) throws Exception {
mView.onLoadData(list);
}
});
}
RxJava2CombineTest.java 文件源码
项目:RxFamilyUsage-Android
阅读 32
收藏 0
点赞 0
评论 0
@Test
public void startWithTest() throws Exception {
Observable.fromArray(nums)
.startWith(-1)
.test()
.assertValueCount(7)
.assertValueAt(0, -1);
//也可以这样
Observable.fromArray(nums)
.startWith(Observable.just(-1))
.test()
.assertValueCount(7)
.assertValueAt(0, -1);
}
FilterOperatorFragment.java 文件源码
项目:Rx-Android-Samples
阅读 22
收藏 0
点赞 0
评论 0
@Override
public void onViewCreated(View view, Bundle savedInstanceState) {
super.onViewCreated(view, savedInstanceState);
mRepoListView = view.findViewById(R.id.repo_list_view);
mObserverLog = view.findViewById(R.id.observer_log);
mObserverLog.setMovementMethod(new ScrollingMovementMethod());
mApi.getObservableRepositories(Utils.USER)
.flatMap(new Function<List<RepositoryResponse>, ObservableSource<RepositoryResponse>>() {
@Override
public ObservableSource<RepositoryResponse> apply(List<RepositoryResponse> repositoryResponses) throws Exception {
return Observable.fromIterable(repositoryResponses);
}
})
.filter(new Predicate<RepositoryResponse>() {
@Override
public boolean test(RepositoryResponse repositoryResponse) throws Exception {
//Filter the stream so only Java repositories are emitted.
return repositoryResponse.language.equals("Java");
}
})
//Subscribe the Network call in io Thread.
.subscribeOn(Schedulers.io())
//Subscribe the Observer in MainThread so it can updates the UI with the result.
.observeOn(AndroidSchedulers.mainThread())
//Choose the subscribed Observer for items emitted by this observable.
.subscribe(mBaseObserver);
}
CombineLatestExampleActivity.java 文件源码
项目:RxJava2-Android-Sample
阅读 31
收藏 0
点赞 0
评论 0
private void doSomeWork() {
final String[] aStrings = {"A1", "A2", "A3", "A4"};
final String[] bStrings = {"B1", "B2", "B3"};
final Observable<String> aObservable = Observable.fromArray(aStrings);
final Observable<String> bObservable = Observable.fromArray(bStrings);
Observable.combineLatest(aObservable, bObservable, new BiFunction<String, String, String>() {
@Override
public String apply(@NonNull String s, @NonNull String s2) throws Exception {
return s + "-" + s2;
}
}).subscribe(getObserver());
}
NaviSetLinePresenter.java 文件源码
项目:AssistantBySDK
阅读 34
收藏 0
点赞 0
评论 0
/** 偏航规划结束 **/
@Override
public void onYawingRequestSuccess() {
Log.i(TAG, "onYawingRequestSuccess");
if (SynthesizerBase.isInited()) {
SpeechMsgBuilder msgBuilder = new SpeechMsgBuilder("路线规划完毕")
.setForceLocalEngine(true)
.setOrigin(SpeechMsg.ORIGIN_COMMON);
Observable<SpeechMsg> msgObservable = SynthesizerBase.get().addMessageWaitSpeak(msgBuilder.build());
if (msgObservable != null) {
msgObservable.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe();
}
}
BNRoutePlanerProxy.getInstance().routePlans.clear();
RouteModel.getLastRouteModels().clear();
Vector<RoutePlanModelProxy> v = new Vector<RoutePlanModelProxy>();
ArrayList<RouteModel> routeModels = new ArrayList<RouteModel>();
int l = BNRoutePlanerProxy.getInstance().getRouteCnt();
if (l > 0) {
for (int i = 0; i < l; i++) {
Bundle bundle = new Bundle();
RoutePlanModelProxy rp = new RoutePlanModelProxy();
BNRoutePlanerProxy.getInstance().getRouteInfo(i, bundle);
rp.parseRouteResult(mContext, bundle);
routeModels.add(new RouteModel(bundle));
v.add(rp);
}
RouteModel.put(calculatePreference, routeModels);
BNRoutePlanerProxy.getInstance().routePlans.put(calculatePreference, v);
}
}
EmoticonDrawable.java 文件源码
项目:GifEmoji
阅读 27
收藏 0
点赞 0
评论 0
void animation() {
if (delay > 0 && frameNum > 0)
Observable
.interval(delay, TimeUnit.MILLISECONDS)
.flatMap(new Function<Long, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Long aLong) throws Exception {
return Observable.just(aLong);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long drawable) {
position++;
if (position >= frameNum) position = 0;
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
Sandbox.java 文件源码
项目:Reactive-Android-Programming
阅读 27
收藏 0
点赞 0
评论 0
private static void demo8() throws InterruptedException {
Observable.range(1, 1000)
.map(Objects::toString)
.doOnNext(i -> log("doOnNext", i))
.observeOn(Schedulers.computation())
.subscribe(i -> log("subscribe", i));
WAIT_LATCH.await();
}
OpenQuestionsTest.java 文件源码
项目:rxjavatraining
阅读 71
收藏 0
点赞 0
评论 0
@Test
public void usingFlatMapToJumpConditionallyOntoAnotherThread() throws Exception {
Observable.just(1, 2, 3, 4).flatMap(x -> {
if (x % 2 == 0) return Observable.just(x + 1).observeOn(Schedulers.io());
else return Observable.just(x + 3).observeOn(Schedulers.computation());
}
).subscribe(x -> {
System.out.print(Thread.currentThread().getName());
System.out.println(": " + x);
});
}
Mastodon.java 文件源码
项目:TootApp
阅读 23
收藏 0
点赞 0
评论 0
/**
* Get the public (federated) timeline
*
* @return an array of Status containing the newest federated statuses
*/
public Observable<Response<Status[]>> getPublicTimeline() {
return buildRxRetrofit().create(API.class).getPublicTimeline(
Toot.buildBearer(),
null
);
}
RxAnimations.java 文件源码
项目:Rx2Animations
阅读 43
收藏 0
点赞 0
评论 0
public static Completable fadeInWithDelay(final int delay, final int duration, final View... views) {
return Observable.range(0, views.length)
.flatMapCompletable(i -> animate(views[i], new LinearInterpolator())
.duration(duration)
.delay(i * delay)
.fadeIn().schedule());
}
NullAwayRxSupportNegativeCases.java 文件源码
项目:NullAway
阅读 29
收藏 0
点赞 0
评论 0
private Observable<Integer> filterThenMapNullableContainerLambdas2(
Observable<NullableContainer<String>> observable) {
return observable
.filter(
c -> {
if (c.get() == null) {
return false;
} else {
return true;
}
})
.map(c -> c.get().length());
}
LingjuAudioPlayer.java 文件源码
项目:AssistantBySDK
阅读 26
收藏 0
点赞 0
评论 0
@Override
public Observable<PlayMusic> play(int position, boolean preLoad) {
if (repository.findByListType(playlistType).getPlayMode() != this.playMode) {
repository.findByListType(playlistType).setPlayMode(this.playMode);
}
return play(repository.findByListType(playlistType).getAndMark(position), preLoad);
}
ApiTransformer.java 文件源码
项目:XSnow
阅读 22
收藏 0
点赞 0
评论 0
public static <T> ObservableTransformer<T, T> norTransformer(final int retryCount, final int retryDelayMillis) {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(Observable<T> apiResultObservable) {
return apiResultObservable
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.retryWhen(new ApiRetryFunc(retryCount, retryDelayMillis));
}
};
}
HotMovieListManager.java 文件源码
项目:ZhaZhaShop
阅读 27
收藏 0
点赞 0
评论 0
/**
* 获取热门电影列表
*
* @param limit
* @return
*/
public Observable<HotMovieBean> getHotMovieList(int limit) {
return RetrofitClient
.getInstance()
.apiServer()
.getHotMovieList(20, limit)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
Ch5_21.java 文件源码
项目:Learning-RxJava
阅读 26
收藏 0
点赞 0
评论 0
public static void main(String[] args) {
Observable<String> source1 =
Observable.interval(1, TimeUnit.SECONDS)
.map(l -> (l + 1) + " seconds");
Observable<String> source2 =
Observable.interval(300, TimeUnit.MILLISECONDS)
.map(l -> ((l + 1) * 300) + " milliseconds");
Subject<String> subject = PublishSubject.create();
subject.subscribe(System.out::println);
source1.subscribe(subject);
source2.subscribe(subject);
sleep(3000);
}
MemoryDao.java 文件源码
项目:store2realm
阅读 22
收藏 0
点赞 0
评论 0
@Override
public Observable<List<TestModel>> insertOrUpdate(List<TestModel> items) {
List<TestModel> output = new ArrayList<>(items.size());
for(int i = 0; i < items.size(); i++) {
output.set(i, insertObjectOrUpdate(items.get(i)));
}
return Observable.just(output);
}
GankApi.java 文件源码
项目:csdn-retrofit
阅读 29
收藏 0
点赞 0
评论 0
@FormUrlEncoded
@POST("api/add2gank")
Observable<Object> postDataByRx(@Field("url") String url,
@Field("desc") String desc,
@Field("who") String who,
@Field("type") String type,
@Field("debug") String debug);