private void loadData(final boolean refresh) {
Observable.timer(5, TimeUnit.SECONDS).compose(new Observable.Transformer<Long, List<String>>() {
@Override
public Observable<List<String>> call(Observable<Long> longObservable) {
return Observable.create(new Observable.OnSubscribe<List<String>>() {
@Override
public void call(Subscriber<? super List<String>> subscriber) {
subscriber.onNext(pageNo == 0 ? new ArrayList<String>() : AnalogData.analogString(pageNo));
subscriber.onCompleted();
}
});
}
}).compose(RxSchedulers.<List<String>>io2main())
.subscribe(new Action1<List<String>>() {
@Override
public void call(List<String> section2Models) {
mBaseAdapter.addItems(section2Models, refresh);
stopRefresh(refresh);
}
});
}
java类rx.functions.Action1的实例源码
EmptyActivity.java 文件源码
项目:BaseVLayoutAdapterHelper
阅读 27
收藏 0
点赞 0
评论 0
VideoCompletePresenter.java 文件源码
项目:GitHub
阅读 22
收藏 0
点赞 0
评论 0
@Override
public void getData(boolean isRefresh) {
mDbDao.queryBuilder()
.where(VideoInfoDao.Properties.DownloadStatus.eq(DownloadStatus.COMPLETE))
.rx()
.list()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<List<VideoInfo>>() {
@Override
public void call(List<VideoInfo> videoList) {
if (ListUtils.isEmpty(videoList)) {
mView.noData();
} else {
mView.loadData(videoList);
}
}
});
}
RxTask.java 文件源码
项目:LiteReader
阅读 26
收藏 0
点赞 0
评论 0
/**
* Start an async task which can do things beforehand, in background and callback when the job is done on the main thread, and handle the exception with the given action.
*
* @param preExecute action to do beforehand.
* @param doInBackground action to do in the background.
* @param doOnFinish action to do when the job is done.(this is called on main thread)
* @param onError action to do when exceptions are thrown.
* @return the subscription of the task.
*/
public static Subscription asyncTask(final Action0 preExecute, @NonNull final Action0 doInBackground, final Action0 doOnFinish, Action1<Throwable> onError) {
return Observable.just("Hey nerd! This is an async task.")
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
if (preExecute != null) preExecute.call();
}
})
.observeOn(Schedulers.io())
.doOnNext(Actions.toAction1(doInBackground))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
if (doOnFinish != null) doOnFinish.call();
}
}, onError == null ? RxActions.onError() : onError);
}
LoveVideoPresenter.java 文件源码
项目:GitHub
阅读 18
收藏 0
点赞 0
评论 0
@Override
public void getData(boolean isRefresh) {
mDbDao.queryBuilder().where(VideoInfoDao.Properties.IsCollect.eq(true))
.rx()
.list()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<List<VideoInfo>>() {
@Override
public void call(List<VideoInfo> videoList) {
if (ListUtils.isEmpty(videoList)) {
mView.noData();
} else {
mView.loadData(videoList);
}
}
});
}
MyHorizontalScrollView.java 文件源码
项目:editor-sql
阅读 24
收藏 0
点赞 0
评论 0
/**
* 重写onTouchEvent, 当用户的手在MyScrollView上面的时候,
* 直接将MyScrollView滑动的Y方向距离回调给onScroll方法中,当用户抬起手的时候,
* MyScrollView可能还在滑动,所以当用户抬起手我们隔5毫秒给handler发送消息,在handler处理
* MyScrollView滑动的距离
*/
@Override
public boolean onTouchEvent(MotionEvent ev) {
if(onScrollListener != null){
lastScrollX = this.getScrollX();
onScrollListener.onScroll(this.getScrollX());
}
switch(ev.getAction()){
case MotionEvent.ACTION_UP:
Observable.timer(500, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread())
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
if(onScrollListener != null){
onScrollListener.onScrollStop();
}
}
});
break;
}
return super.onTouchEvent(ev);
}
ClosetFragment.java 文件源码
项目:Closet
阅读 29
收藏 0
点赞 0
评论 0
private void initRxBus(){
addSubscription(RemoveItemEvent.class, new Action1<RemoveItemEvent>() {
@Override
public void call(RemoveItemEvent removeItemEvent) {
ProductEntity removeEntity = removeItemEvent.getRemoveEntity();
if(DataLayer.getInstance().getDataPool().getSelectedProductEntityList().contains(removeEntity)){
DataLayer.getInstance().getDataPool().getSelectedProductEntityList().remove(removeEntity);
}
mCategoryProductAdapter.updateData(removeEntity);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
throwable.printStackTrace();
}
});
}
LoopService.java 文件源码
项目:12306_Android
阅读 28
收藏 0
点赞 0
评论 0
private void checkRandCodeAndNext() {
final HttpService service = RetrofitManager.getInstance().getService();
service.checkRandCode2(randCode, "randp", "", orderParam.getREPEAT_SUBMIT_TOKEN())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<CheckRandCodeResponse>() {
@Override
public void call(CheckRandCodeResponse checkRandCodeResponse) {
if (checkRandCodeResponse.getData().getResult().equals("1")) {
RxBus.getDefault().post("pass");
submitOrderNext(service, orderParam.getREPEAT_SUBMIT_TOKEN());
}
else {
RxBus.getDefault().post("验证码错误");
RxBus.getDefault().post("clear");
refreshPassCode();
}
}
});
}
HomePresenterImpl.java 文件源码
项目:HLOLI
阅读 21
收藏 0
点赞 0
评论 0
@Override
public void getNewsData(int channelId, final boolean refresh) {
if (refresh) pageNum = 0;
mModel.getNewsData(channelId, pageNum, new Action1<NewsList>() {
@Override
public void call(NewsList newsList) {
mView.onNewsData(newsList.getList(), refresh);
mView.stopLoading();
if ("True".equals(newsList.getNext())) {
pageNum = Integer.parseInt(newsList.getNextpage());
}
if (refresh) {
SharedPreManager.getInstance().putNewsList(mContext, newsList);
}
}
});
}
RxUtil.java 文件源码
项目:GitHub
阅读 28
收藏 0
点赞 0
评论 0
public static <T> Observable.Transformer<T, T> rxCacheBeanHelper(final String key) {
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> observable) {
return observable
.subscribeOn(Schedulers.io())//指定doOnNext执行线程是新线程
.doOnNext(new Action1<T>() {
@Override
public void call(final T data) {
Schedulers.io().createWorker().schedule(new Action0() {
@Override
public void call() {
LogUtils.d("get data from network finish ,start cache...");
ACache.get(ReaderApplication.getsInstance())
.put(key, new Gson().toJson(data, data.getClass()));
LogUtils.d("cache finish");
}
});
}
})
.observeOn(AndroidSchedulers.mainThread());
}
};
}
FilterActivity.java 文件源码
项目:android-advanced-light
阅读 23
收藏 0
点赞 0
评论 0
private void throttleFirst() {
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for(int i=0;i<10;i++){
subscriber.onNext(i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
subscriber.onCompleted();
}
}).throttleFirst(200, TimeUnit.MILLISECONDS).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "throttleFirst:"+integer);
}
});
}
PlacesResultActivity.java 文件源码
项目:GitHub
阅读 20
收藏 0
点赞 0
评论 0
@Override
protected void onLocationPermissionGranted() {
compositeSubscription = new CompositeSubscription();
compositeSubscription.add(reactiveLocationProvider.getPlaceById(placeId)
.subscribe(new Action1<PlaceBuffer>() {
@Override
public void call(PlaceBuffer buffer) {
Place place = buffer.get(0);
if (place != null) {
placeNameView.setText(place.getName());
placeLocationView.setText(place.getLatLng().latitude + ", " + place.getLatLng().longitude);
placeAddressView.setText(place.getAddress());
}
buffer.release();
}
}));
}
GeofenceActivity.java 文件源码
项目:GitHub
阅读 23
收藏 0
点赞 0
评论 0
private void addGeofence() {
final GeofencingRequest geofencingRequest = createGeofencingRequest();
if (geofencingRequest == null) return;
final PendingIntent pendingIntent = createNotificationBroadcastPendingIntent();
reactiveLocationProvider
.removeGeofences(pendingIntent)
.flatMap(new Func1<Status, Observable<Status>>() {
@Override
public Observable<Status> call(Status pendingIntentRemoveGeofenceResult) {
return reactiveLocationProvider.addGeofences(pendingIntent, geofencingRequest);
}
})
.subscribe(new Action1<Status>() {
@Override
public void call(Status addGeofenceResult) {
toast("Geofence added, success: " + addGeofenceResult.isSuccess());
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
toast("Error adding geofence.");
Log.d(TAG, "Error adding geofence.", throwable);
}
});
}
GenericProfile.java 文件源码
项目:SensorTag2Testing
阅读 22
收藏 0
点赞 0
评论 0
protected void configurationImp(@NonNull Action1<byte[]> action) {
mConn.flatMap(rxBleConnection -> rxBleConnection
.writeCharacteristic(uuidConf, baConf))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(action, this::onConnectionFailure);
// Speed up period for compass calibration
// For some reason temperature sensor does not work at this speed
if (!uuidConf.equals(UUID.fromString(TempertureProfile.GattConf))) {
// Period 0x14 = 200ms
byte[] periodConf = new byte[]{(byte) 0x14};
mConn.flatMap(rxBleConnection -> rxBleConnection
.writeCharacteristic(uuidPeri, periodConf))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(action, this::onConnectionFailure);
}
}
DeliverReplayTest.java 文件源码
项目:GitHub
阅读 20
收藏 0
点赞 0
评论 0
@Test
public void testPagingCapabilities() {
PublishSubject<Object> view = PublishSubject.create();
BehaviorSubject<Integer> nextPageRequests = BehaviorSubject.create();
final TestObserver<Delivery<Object, String>> testObserver = new TestObserver<>();
nextPageRequests
.concatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer targetPage) {
return targetPage <= requestedPageCount ?
Observable.<Integer>never() :
Observable.range(requestedPageCount, targetPage - requestedPageCount);
}
})
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer it) {
requestedPageCount = it + 1;
}
})
.startWith(Observable.range(0, requestedPageCount))
.concatMap(new Func1<Integer, Observable<String>>() {
@Override
public Observable<String> call(final Integer page) {
return requestPage(page, PAGE_SIZE);
}
})
.compose(new DeliverReplay<Object, String>(view))
.subscribe(testObserver);
ArrayList<Delivery<Object, String>> onNext = new ArrayList<>();
testObserver.assertReceivedOnNext(onNext);
view.onNext(999);
addOnNext(onNext, 999, 0, 1, 2);
testObserver.assertReceivedOnNext(onNext);
nextPageRequests.onNext(2);
addOnNext(onNext, 999, 3, 4, 5);
testObserver.assertReceivedOnNext(onNext);
view.onNext(null);
assertEquals(0, testObserver.getOnCompletedEvents().size());
testObserver.assertReceivedOnNext(onNext);
nextPageRequests.onNext(3);
assertEquals(0, testObserver.getOnCompletedEvents().size());
testObserver.assertReceivedOnNext(onNext);
view.onNext(9999);
addOnNext(onNext, 9999, 0, 1, 2, 3, 4, 5, 6, 7, 8);
assertEquals(0, testObserver.getOnCompletedEvents().size());
testObserver.assertReceivedOnNext(onNext);
}
StaffDetailSummaryViewModel.java 文件源码
项目:LiteReader
阅读 21
收藏 0
点赞 0
评论 0
@Override
public void onViewAttached(View view) {
if (!hasAttach) {
hasAttach = true;
RxTask.asyncMap(new Func1<String, String>() {
@Override
public String call(String s) {
return extractContent(prefix + id);
}
}, new Action1<String>() {
@Override
public void call(String summary) {
if (summary.length() == 2) {
getSelfView().getBinding().tvSummary.setText(getString(R.string.douban_staff_no_introduce));
getSelfView().getBinding().ivToggle.setVisibility(View.GONE);
} else {
initText(summary);
}
RxBus.getInstance().send(true, Constant.LOADING_COMPLETE_SIGNAL); //通知已经加载完毕
}
});
}
}
OperatorPublish.java 文件源码
项目:boohee_v5.6
阅读 19
收藏 0
点赞 0
评论 0
public void connect(Action1<? super Subscription> connection) {
PublishSubscriber<T> ps;
PublishSubscriber<T> u;
boolean doConnect;
do {
ps = (PublishSubscriber) this.current.get();
if (ps != null && !ps.isUnsubscribed()) {
break;
}
u = new PublishSubscriber(this.current);
u.init();
} while (!this.current.compareAndSet(ps, u));
ps = u;
if (ps.shouldConnect.get() || !ps.shouldConnect.compareAndSet(false, true)) {
doConnect = false;
} else {
doConnect = true;
}
connection.call(ps);
if (doConnect) {
this.source.unsafeSubscribe(ps);
}
}
OrderBy.java 文件源码
项目:csync-android
阅读 21
收藏 0
点赞 0
评论 0
@Override
public StringBuffer fill(final StringBuffer sb) {
query.fill(sb);
if (fields.length > 0) {
sb.append(" ORDER BY ");
visit(fields, new Action1<String>() {
@Override public void call(String s) {
sb.append(s);
}
}, new Runnable() {
@Override public void run() {
sb.append(",");
}
});
sb.append(desc ? " DESC" : " ASC");
}
return sb;
}
FileFragment.java 文件源码
项目:editor-sql
阅读 18
收藏 0
点赞 0
评论 0
@Override
protected void initView() {
swipeRefreshLayout.setColorSchemeResources(R.color.app_bar_bg_dark);
swipeRefreshLayout.setOnRefreshListener(new SwipeRefreshLayout.OnRefreshListener() {
@Override
public void onRefresh() {
Observable.timer(600, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread())
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Bundle bundle = new Bundle();
bundle.putSerializable(Constant.FILE_DATA_TYPE, FileDataType.refresh);
bundle.putString(Constant.BACK_PATH, adapter.getParentPath());
getLoaderManager().destroyLoader(DataType.fileData.ordinal());
getLoaderManager().initLoader(DataType.fileData.ordinal(), bundle, adapter);
}
});
}
});
}
ResolversPresenter.java 文件源码
项目:AppChooser
阅读 24
收藏 0
点赞 0
评论 0
@Override
public void loadResolvers(@NonNull final String mimeType) {
mSubscription.clear();
Subscription subscription = loadResolversInternal(mimeType)
.subscribe(new Action1<List<Resolver>>() {
@Override
public void call(List<Resolver> resolvers) {
if (resolvers == null || resolvers.size() == 0) {
loadMediaTypes();
} else if (resolvers.size() == 1) {
try {
mView.showFileContent(resolvers.get(0).loadActivityInfo(mimeType), mFile, mRequestCode);
} catch (AppChooserException e) {
loadMediaTypes();
}
} else {
mView.showResolvers(resolvers);
}
}
});
mSubscription.add(subscription);
}
OperatorReplay.java 文件源码
项目:boohee_v5.6
阅读 25
收藏 0
点赞 0
评论 0
public void connect(Action1<? super Subscription> connection) {
ReplaySubscriber<T> ps;
ReplaySubscriber<T> u;
boolean doConnect;
do {
ps = (ReplaySubscriber) this.current.get();
if (ps != null && !ps.isUnsubscribed()) {
break;
}
u = new ReplaySubscriber(this.current, (ReplayBuffer) this.bufferFactory.call());
u.init();
} while (!this.current.compareAndSet(ps, u));
ps = u;
if (ps.shouldConnect.get() || !ps.shouldConnect.compareAndSet(false, true)) {
doConnect = false;
} else {
doConnect = true;
}
connection.call(ps);
if (doConnect) {
this.source.unsafeSubscribe(ps);
}
}
RxCallAdapter.java 文件源码
项目:pcloud-networking-java
阅读 20
收藏 0
点赞 0
评论 0
@Override
public Observable<T> adapt(final Call<T> call) {
return Observable.<T>create(SyncOnSubscribe.createSingleState(new Func0<Call<T>>() {
@Override
public Call<T> call() {
return call.clone();
}
}, new Action2<Call<T>, Observer<? super T>>() {
@Override
public void call(Call<T> callClone, Observer<? super T> observer) {
try {
observer.onNext(callClone.execute());
observer.onCompleted();
} catch (Throwable throwable) {
observer.onError(throwable);
}
}
}, new Action1<Call<T>>() {
@Override
public void call(Call<T> tCall) {
tCall.cancel();
}
}));
}
AddActivity.java 文件源码
项目:DizzyPassword
阅读 30
收藏 0
点赞 0
评论 0
@Override
protected void onResume() {
super.onResume();
RxBus.getInstance().toObserverable(RxBean.class)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<RxBean>() {
@Override
public void call(RxBean rxBean) {
if (rxBean.getPwd() == "") {
return;
} else {
et_password.setText(rxBean.getPwd());
}
}
});
}
IplayPresenter.java 文件源码
项目:newIPlay
阅读 23
收藏 0
点赞 0
评论 0
@Override
public void requestData(RequestParamsBean params) {
Log.i("tag", "KaienPresenter requestData()");
mSubscription = ApiManager.getInstance().getNewsApi()
.getIplayCommunityData()
.compose(this.<IplayCommunity>applySchedulers())
.subscribe(new Action1<IplayCommunity>() {
@Override
public void call(IplayCommunity iplayCommunity) {
Log.i("tag", "KaienPresenter call()"+iplayCommunity.getInfo().getDiscuzList());
mView.onLoadSuccesed();
mView.refreshRecyclerView(iplayCommunity.getInfo().getDiscuzList());
}
},mOnError);
}
DoubanCommentListViewModel.java 文件源码
项目:LiteReader
阅读 22
收藏 0
点赞 0
评论 0
private void getCommentList(Action1<List<Comment>> onNext) {
service.getCommentList(id, start, perPage)
.compose(new ThreadDispatcher<DoubanCommentList>())
.map(new Func1<DoubanCommentList, List<Comment>>() {
@Override
public List<Comment> call(DoubanCommentList list) {
return list.comments;
}
})
.subscribe(onNext, new ActionHttpError() {
@Override
protected void onError() {
setIsLoadingMore(false);
hideRefreshing();
}
});
}
LoginPresenter.java 文件源码
项目:AppCommonFrame
阅读 43
收藏 0
点赞 0
评论 0
@Override
public void getVerifyCode() {
Subscription subscription = Observable.just("")
.subscribeOn(AndroidSchedulers.mainThread())
.doOnSubscribe(new Action0() {
@Override
public void call() {
mLoginView.showLoading("获取验证码...");
}
})
.observeOn(Schedulers.io())
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
mDataRepository.getVerifyCode("3", getIMEI(mLoginView.getViewContext()), mCallbackImp);
}
});
mSubscriptions.add(subscription);
}
OnePlusNLayoutActivity.java 文件源码
项目:BaseVLayoutAdapterHelper
阅读 27
收藏 0
点赞 0
评论 0
@Override
protected void onProcessLogic() {
Observable.create(new Observable.OnSubscribe<List<NormalModel>>() {
@Override
public void call(Subscriber<? super List<NormalModel>> subscriber) {
subscriber.onNext(AnalogData.analogNormalModel());
subscriber.onCompleted();
}
}).compose(RxSchedulers.<List<NormalModel>>io2main())
.subscribe(new Action1<List<NormalModel>>() {
@Override
public void call(List<NormalModel> normalModels) {
mBaseAdapter.addItems(normalModels);
mOnePlusNLayoutAdapter.addItems(normalModels);
}
});
}
Fragment_SequenceEqual.java 文件源码
项目:Go-RxJava
阅读 23
收藏 0
点赞 0
评论 0
public void runCode() {
// window操作符会在时间间隔内缓存结果,类似于buffer缓存一个list集合,区别在于window将这个结果集合封装成了observable
// http://blog.csdn.net/axuanqq/article/details/50756530
Observable
.sequenceEqual(
Observable.just("A","B")
,Observable.just("A","B"))
.subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean aBoolean) {
println(aBoolean);
}
});
}
GankPresenterImpl.java 文件源码
项目:LueansRead
阅读 22
收藏 0
点赞 0
评论 0
@Override
public void onSuccess(boolean isTop, GankBean data) {
Log.i(TAG, "onSuccess: ----------------------------------");
if (isTop) {
mView.setRefreshData(data);
} else {
mView.setMoreGankData(data);
}
Observable.timer(2, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
mView.hideLoading();
}
});
}
RxSocialAuth.java 文件源码
项目:RxSocialAuth
阅读 31
收藏 0
点赞 0
评论 0
/**
* Sign out from both Google and Facebook and disable auto sign in for Smart Lock Password.
* Require a FragmentActivity
*
* @param activity the activity
* @return a PublishSubject<RxStatus>
*/
public PublishSubject<RxStatus> signOut(FragmentActivity activity) {
mStatusObserver = PublishSubject.create();
Observable<RxStatus> rxGoogleSignOut = new RxGoogleAuth.Builder(activity).build().signOut();
Observable<RxStatus> rxFacebookSignOut = new RxFacebookAuth.Builder(activity).build().signOut();
Observable<RxStatus> rxSmartLockDisableAutoSignin =
new RxSmartLockPasswords.Builder(activity).build().disableAutoSignIn();
Observable.merge(rxGoogleSignOut, rxFacebookSignOut, rxSmartLockDisableAutoSignin)
.subscribe(new Action1<RxStatus>() {
@Override
public void call(RxStatus rxStatus) {
mStatusObserver.onNext(rxStatus);
mStatusObserver.onCompleted();
}
});
return mStatusObserver;
}
GetCookiesInterceptor.java 文件源码
项目:GongXianSheng
阅读 25
收藏 0
点赞 0
评论 0
@Override
public Response intercept(Chain chain) throws IOException {
Response originalResponse = chain.proceed(chain.request());
if (!originalResponse.headers(KEY_COOKIE).isEmpty()) {
final StringBuffer cookieBuffer = new StringBuffer();
Observable.from(originalResponse.headers(KEY_COOKIE))
.map(new Func1<String, String>() {
@Override
public String call(String s) {
String[] cookieArray = s.split(";");
return cookieArray[0];
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String cookie) {
cookieBuffer.append(cookie);
}
});
if (cookieBuffer.toString().contains("session_id")&&chain.request().url().uri().getPath().contains("/gongfu/v2/authenticate")){
PreferencesHelper.setCookie(cookieBuffer.toString());
Log.i("session_id",cookieBuffer.toString());
}
}
return originalResponse;
}