public Observable<RxBeaconRange> beaconsInRegion() {
return startup()
.flatMap(new Function<Boolean, ObservableSource<RxBeaconRange>>() {
@Override
public ObservableSource<RxBeaconRange> apply(@NonNull Boolean aBoolean) throws Exception {
return Observable.create(new ObservableOnSubscribe<RxBeaconRange>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<RxBeaconRange> objectObservableEmitter) throws Exception {
beaconManager.addRangeNotifier(new RangeNotifier() {
@Override
public void didRangeBeaconsInRegion(Collection<Beacon> collection, Region region) {
objectObservableEmitter.onNext(new RxBeaconRange(collection, region));
}
});
beaconManager.startRangingBeaconsInRegion(getRegion());
}
});
}
});
}
java类io.reactivex.ObservableOnSubscribe的实例源码
RxBeacon.java 文件源码
项目:RxBeacon
阅读 27
收藏 0
点赞 0
评论 0
ThrottleLastExampleActivity.java 文件源码
项目:GitHub
阅读 27
收藏 0
点赞 0
评论 0
private Observable<Integer> getObservable() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// send events with simulated time wait
Thread.sleep(0);
emitter.onNext(1); // skip
emitter.onNext(2); // deliver
Thread.sleep(505);
emitter.onNext(3); // skip
Thread.sleep(99);
emitter.onNext(4); // skip
Thread.sleep(100);
emitter.onNext(5); // skip
emitter.onNext(6); // deliver
Thread.sleep(305);
emitter.onNext(7); // deliver
Thread.sleep(510);
emitter.onComplete();
}
});
}
RecipeDatabaseHelper.java 文件源码
项目:BakingApp
阅读 33
收藏 0
点赞 0
评论 0
public void insertRecipes(@NonNull final ArrayList<Recipe> recipes, Observer<Integer> observer){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
ContentValues[] contentValues = new ContentValues[recipes.size()];
for (int i = 0; i < recipes.size(); ++i) {
contentValues[i] = buildContentValuesFromRecipe(recipes.get(i));
}
int recipesAdded = mContext.getContentResolver().bulkInsert(RecipesContract.RecipeEntry.CONTENT_URI, contentValues);
if (recipesAdded != 0){
e.onNext(recipesAdded);
} else {
e.onError(new NullPointerException("Failed to insert"));
}
e.onComplete();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
}
TaskRepository.java 文件源码
项目:simple-stack
阅读 33
收藏 0
点赞 0
评论 0
private Observable<List<Task>> createResults(QuerySelector<DbTask> querySelector) {
return Observable.create((ObservableOnSubscribe<List<Task>>) emitter -> {
Realm realm = Realm.getDefaultInstance();
final RealmResults<DbTask> dbTasks = querySelector.createQuery(realm);
final RealmChangeListener<RealmResults<DbTask>> realmChangeListener = element -> {
if(element.isLoaded() && !emitter.isDisposed()) {
List<Task> tasks = mapFrom(element);
if(!emitter.isDisposed()) {
emitter.onNext(tasks);
}
}
};
emitter.setDisposable(Disposables.fromAction(() -> {
if(dbTasks.isValid()) {
dbTasks.removeChangeListener(realmChangeListener);
}
realm.close();
}));
dbTasks.addChangeListener(realmChangeListener);
}).subscribeOn(looperScheduler.getScheduler()).unsubscribeOn(looperScheduler.getScheduler());
}
DebounceExampleActivity.java 文件源码
项目:RxJava2-Android-Sample
阅读 26
收藏 0
点赞 0
评论 0
private Observable<Integer> getObservable() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// send events with simulated time wait
//1(drop)--(400s<500s)---2(pass)---(600s>500s)---3(drop)---(100s<500s)---4(pass)---(605s>500s)---5(pass)---510s
emitter.onNext(1); // skip
Thread.sleep(400);
emitter.onNext(2); // deliver
Thread.sleep(600);
emitter.onNext(3); // skip
Thread.sleep(100);
emitter.onNext(4); // deliver
Thread.sleep(605);
emitter.onNext(5); // deliver
Thread.sleep(510);
emitter.onComplete();
}
});
}
IconShowPresenter.java 文件源码
项目:MBEStyle
阅读 30
收藏 0
点赞 0
评论 0
public Disposable getAllIcons() {
return Observable.create(new ObservableOnSubscribe<IconBean>() {
@Override
public void subscribe(@NonNull ObservableEmitter<IconBean> e) throws Exception {
XmlResourceParser xml = mView.getResources().getXml(R.xml.drawable);
while (xml.getEventType() != XmlResourceParser.END_DOCUMENT) {
if (xml.getEventType() == XmlPullParser.START_TAG) {
if (xml.getName().startsWith("item")) {
IconBean bean = new IconBean();
String iconName = xml.getAttributeValue(null, "drawable");
bean.id = mView.getResources().getIdentifier(
iconName, "drawable", BuildConfig.APPLICATION_ID);
bean.name = iconName;
e.onNext(bean);
}
}
xml.next();
}
e.onComplete();
}
}).toList().subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<IconBean>>() {
@Override
public void accept(List<IconBean> list) throws Exception {
mView.onLoadData(list);
}
});
}
MapActivity.java 文件源码
项目:Rxjava2.0Demo
阅读 31
收藏 0
点赞 0
评论 0
private void map() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "This is result " + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(MainActivity.TAG, "accept: " + Thread.currentThread().getName());
info += s + "\n";
tv.setText(info);
}
});
}
NetTest.java 文件源码
项目:OKHttpLoggingInterceptor
阅读 28
收藏 0
点赞 0
评论 0
public void test()
{
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception
{
e.onNext(1);
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception
{
System.out.println(integer);
}
});
}
UnlimitPostActivity.java 文件源码
项目:GetStartRxJava2.0
阅读 33
收藏 0
点赞 0
评论 0
private void doRxJavaWork() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (;;) { // 无限循环发送事件
emitter.onNext(Integer.MAX_VALUE);
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "" + integer);
}
});
}
BasicTest.java 文件源码
项目:Android-Code-Demos
阅读 31
收藏 0
点赞 0
评论 0
public Observable<String> getObservable() {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("Today's news update");
e.onNext("Today's topic is Study");
e.onComplete();
}
});
/* 下面两个方法作用类似,just 的内部调用的就是 fromArray */
// return Observable.just("Topic 1", "Heat 1", "News");
// return Observable.fromArray("Topic 1", "Heat 1", "News");
/* 只能发送一个数据 */
/*return Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
return "Topic is Study";
}
});*/
}
ConfigPresenter.java 文件源码
项目:NeteaseCloudMusic
阅读 31
收藏 0
点赞 0
评论 0
public void requestLoadingList() {
Observable.create(new ObservableOnSubscribe<List<ConfigBean>>() {
@Override
public void subscribe(ObservableEmitter<List<ConfigBean>> e) throws Exception {
mModel = ConfigModel.getInstance(configView.getContext());
e.onNext(mModel.getConfigList());
mModel.setConfigCallback(ConfigPresenter.this);
}
})
.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<ConfigBean>>() {
@Override
public void accept(List<ConfigBean> list) throws Exception {
configView.displayConfigList(list);
}
});
}
RxClick.java 文件源码
项目:AndroidMVPresenter
阅读 30
收藏 0
点赞 0
评论 0
public static Observable<View> with(final View view) {
return Observable.create(new ObservableOnSubscribe<View>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<View> e) throws Exception {
new Handler(Looper.getMainLooper()).post(new Runnable() {
@Override
public void run() {
view.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View value) {
e.onNext(value);
}
});
}
});
}
});
}
TingPlayProcessor.java 文件源码
项目:AssistantBySDK
阅读 34
收藏 0
点赞 0
评论 0
/**
* 按一级分类查找专辑
**/
private Observable<List<Album>> getAlbumByCate(String cateId, int calc_dimension) {
final Map<String, String> params = new HashMap<>();
params.put(DTransferConstants.CATEGORY_ID, cateId);
params.put(DTransferConstants.CALC_DIMENSION, String.valueOf(calc_dimension));
return Observable.create(new ObservableOnSubscribe<List<Album>>() {
@Override
public void subscribe(final ObservableEmitter<List<Album>> e) throws Exception {
CommonRequest.getAlbumList(params, new IDataCallBack<AlbumList>() {
@Override
public void onSuccess(AlbumList albumList) {
//onNext的参数不允许为null
e.onNext(albumList.getAlbums());
e.onComplete();
}
@Override
public void onError(int i, String s) {
e.onError(new Throwable(i + " " + s));
}
});
}
}).subscribeOn(Schedulers.io());
}
TingPlayProcessor.java 文件源码
项目:AssistantBySDK
阅读 30
收藏 0
点赞 0
评论 0
/**
* 按关键词查找专辑
**/
private Observable<List<Album>> getAlbumByKeyWord(String keyword, String cateId, int calc_dimension) {
final Map<String, String> params = new HashMap<>();
params.put(DTransferConstants.SEARCH_KEY, keyword);
params.put(DTransferConstants.CATEGORY_ID, cateId);
params.put(DTransferConstants.CALC_DIMENSION, String.valueOf(calc_dimension));
return Observable.create(new ObservableOnSubscribe<List<Album>>() {
@Override
public void subscribe(final ObservableEmitter<List<Album>> e) throws Exception {
CommonRequest.getSearchedAlbums(params, new IDataCallBack<SearchAlbumList>() {
@Override
public void onSuccess(SearchAlbumList searchAlbumList) {
e.onNext(searchAlbumList.getAlbums());
e.onComplete();
}
@Override
public void onError(int i, String s) {
e.onError(new Throwable(i + " " + s));
}
});
}
})
.subscribeOn(Schedulers.io());
}
StandardBuildsService.java 文件源码
项目:starcraft-2-build-player
阅读 24
收藏 0
点赞 0
评论 0
/**
* Returns an observable on the progress of loading stock build orders into the local SQLite DB.
* Should be scheduled on a worker thread.
*
* @param c context
* @param forceLoad if false, builds are only copied if an upgrade is required. If true,
* standard builds are always copied.
* @return observable on load progress (percentage)
*/
public static Observable<Integer> getLoadStandardBuildsIntoDBObservable(final Context c, final boolean forceLoad) {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<Integer> emitter) throws Exception {
try {
if (!emitter.isDisposed()) {
loadStandardBuildsIntoDB(c, forceLoad, new DbAdapter.ProgressListener() {
@Override
public void onProgressUpdate(int percent) {
if (!emitter.isDisposed()) {
emitter.onNext(percent);
}
}
});
emitter.onComplete();
}
} catch (Exception e) {
emitter.onError(e);
}
}
});
}
ReactiveAirplaneMode.java 文件源码
项目:ReactiveAirplaneMode
阅读 30
收藏 0
点赞 0
评论 0
/**
* Observes Airplane Mode state of the device with BroadcastReceiver.
* RxJava2 Observable emits true if the airplane mode turns on and false otherwise.
*
* @param context of the Application or Activity
* @return RxJava2 Observable with Boolean value indicating state of the airplane mode
*/
public Observable<Boolean> observe(final Context context) {
checkContextIsNotNull(context);
final IntentFilter filter = createIntentFilter();
return Observable.create(new ObservableOnSubscribe<Boolean>() {
@Override public void subscribe(@NonNull final ObservableEmitter<Boolean> emitter)
throws Exception {
final BroadcastReceiver receiver = createBroadcastReceiver(emitter);
context.registerReceiver(receiver, filter);
final Disposable disposable = disposeInUiThread(new Action() {
@Override public void run() throws Exception {
tryToUnregisterReceiver(receiver, context);
}
});
emitter.setDisposable(disposable);
}
});
}
AccountingActivity.java 文件源码
项目:AssistantBySDK
阅读 105
收藏 0
点赞 0
评论 0
/**
* 更新余额、当日收支
**/
public void updateBalance(final int type, final List<TaskCard<Accounting>> taskcards) {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
CountToday();
CountBalance(type, taskcards);
e.onNext(0);
}
})
.subscribeOn(Schedulers.io()) //执行订阅(subscribe())所在线程
.observeOn(AndroidSchedulers.mainThread()) //响应订阅(Sbscriber)所在线程
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
if (AppConfig.dPreferences.getBoolean(AppConfig.HAS_AMOUNT, false))
mTvBalance.setText("¥" + AssistUtils.formatAmount(balance));
mAdapter.notifyItemChanged(0);
}
});
}
RxGps.java 文件源码
项目:RxGps
阅读 42
收藏 0
点赞 0
评论 0
private Observable<Boolean> checkPlayServicesAvailable() {
return Observable.create(new ObservableOnSubscribe<Boolean>() {
@Override
public void subscribe(ObservableEmitter<Boolean> e) throws Exception {
final Activity activity = activityReference.get();
if (activity != null) {
final GoogleApiAvailability apiAvailability = GoogleApiAvailability.getInstance();
final int status = apiAvailability.isGooglePlayServicesAvailable(activity);
if (status != ConnectionResult.SUCCESS) {
e.onError(new PlayServicesNotAvailableException());
} else {
e.onNext(true);
e.onComplete();
}
}
}
});
}
RxJavaUnitTest.java 文件源码
项目:code-examples-android-expert
阅读 38
收藏 0
点赞 0
评论 0
@Test public void test(){
Observable<Todo> todoObservable = Observable.create(new ObservableOnSubscribe<Todo>() {
@Override
public void subscribe(ObservableEmitter<Todo> emitter) throws Exception {
try {
List<Todo> todos = RxJavaUnitTest.this.getTodos();
if (todos!=null){
throw new NullPointerException("todos was null");
}
for (Todo todo : todos) {
emitter.onNext(todo);
}
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
});
TestObserver<Object> testObserver = new TestObserver<>();
todoObservable.subscribeWith(testObserver);
testObserver.assertError(NullPointerException.class);
}
DemoForTraceActivity.java 文件源码
项目:SAF-AOP
阅读 29
收藏 0
点赞 0
评论 0
@Trace(enable = false)
private void initData() {
Observable.create(new ObservableOnSubscribe<String>() {
@Trace
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("111");
e.onNext("222");
e.onNext("333");
}
}).subscribe(new Consumer<String>() {
@Trace
@Override
public void accept(@NonNull String str) throws Exception {
}
});
}
DatabaseManager.java 文件源码
项目:simple-stack
阅读 29
收藏 0
点赞 0
评论 0
public void openDatabase() {
disposable = Observable.create((ObservableOnSubscribe<Realm>) emitter -> {
final Realm observableRealm = Realm.getDefaultInstance();
final RealmChangeListener<Realm> listener = realm -> {
if(!emitter.isDisposed()) {
emitter.onNext(observableRealm);
}
};
observableRealm.addChangeListener(listener);
emitter.setDisposable(Disposables.fromAction(() -> {
observableRealm.removeChangeListener(listener);
observableRealm.close();
}));
emitter.onNext(observableRealm);
}).subscribeOn(looperScheduler.getScheduler()).unsubscribeOn(looperScheduler.getScheduler()).subscribe();
}
RxJsoup.java 文件源码
项目:RxRetroJsoup
阅读 25
收藏 0
点赞 0
评论 0
public Observable<String> text(final Element element, final String expression) {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
final Elements elements = element.select(expression);
if (elements.isEmpty() && exceptionIfNotFound) {
observableEmitter.onError(new NotFoundException(expression, element.toString()));
} else {
if (elements.isEmpty()) {
observableEmitter.onNext("");
} else {
for (Element e : elements) {
observableEmitter.onNext(e.text());
}
}
observableEmitter.onComplete();
}
}
});
}
Create.java 文件源码
项目:RxJava4AndroidDemos
阅读 52
收藏 0
点赞 0
评论 0
@Override
public void test3() {
Log.i(TAG, "test3() Create simple demo, onNext() twice");
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
for (int i = 0; i < 3; i++) {
e.onNext(String.valueOf(i));
}
}
});
for (int time = 0; time < 2; time++) {
observable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "Consumer<String> accept() s: " + s);
}
});
}
}
DemoObservable.java 文件源码
项目:Reactive-Programming-With-Java-9
阅读 28
收藏 0
点赞 0
评论 0
public static void main(String[] args) {
Observable<String> month_observable = Observable.create(new
ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter)
throws Exception {
// TODO Auto-generated method stub
try {
String[] monthArray = { "Jan", "Feb", "Mar",
"Apl", "May", "Jun", "July", "Aug",
"Sept", "Oct","Nov", "Dec" };
List<String> months = Arrays.asList(monthArray);
for (String month : months) {
emitter.onNext(month);
}
emitter.onComplete();
} catch (Exception e) {
// TODO: handle exception
emitter.onError(e);
}
}
});
month_observable.subscribe(s -> System.out.println(s));
}
ThrottleLastExampleActivity.java 文件源码
项目:RxJava2-Android-Sample
阅读 29
收藏 0
点赞 0
评论 0
private Observable<Integer> getObservable() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// send events with simulated time wait
Thread.sleep(0);
emitter.onNext(1); // skip
emitter.onNext(2); // deliver
Thread.sleep(505);
emitter.onNext(3); // skip
Thread.sleep(99);
emitter.onNext(4); // skip
Thread.sleep(100);
emitter.onNext(5); // skip
emitter.onNext(6); // deliver
Thread.sleep(305);
emitter.onNext(7); // deliver
Thread.sleep(510);
emitter.onComplete();
}
});
}
TempStorageUtils.java 文件源码
项目:PXLSRT
阅读 29
收藏 0
点赞 0
评论 0
static Observable<Boolean> storeFile(final FileOutputStream fos, final Bitmap bitmap, final int quality) {
return Observable.create(new ObservableOnSubscribe<Boolean>() {
@Override
public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
bitmap.compress(Bitmap.CompressFormat.JPEG, quality, fos);
bitmap.recycle();
try {
fos.flush();
fos.close();
emitter.onNext(true);
} catch (IOException e) {
e.printStackTrace();
emitter.onError(e);
}
}
});
}
HistoryModel.java 文件源码
项目:Aurora
阅读 32
收藏 0
点赞 0
评论 0
@Override
public Observable<List<VideoDaoEntity>> getListFromNet(int start, String userid) {
return Observable.create((ObservableOnSubscribe<List<VideoDaoEntity>>) emitter -> {
BmobQuery<VideoDaoEntity> query = new BmobQuery<VideoDaoEntity>();
query.addWhereEqualTo("userId", userid);
query.setLimit(10);
query.order("-updatedAt");
query.setSkip(start);
query.findObjects(new FindListener<VideoDaoEntity>() {
@Override
public void done(List<VideoDaoEntity> list, BmobException e) {
List<VideoDaoEntity> infolist = new ArrayList<VideoDaoEntity>();
if (!StringUtils.isEmpty(list)) {
for (VideoDaoEntity entity1 : list) {
entity1.setVideo(mGson.fromJson(entity1.getBody(), VideoListInfo.Video.VideoData.class));
infolist.add(entity1);
}
}
emitter.onNext(infolist);
}
});
});
}
MountReceiver.java 文件源码
项目:Mount
阅读 25
收藏 0
点赞 0
评论 0
private void onActionPackageFullyRemoved(final Intent intent) {
Observable.create(
new ObservableOnSubscribe<Boolean>() {
@Override
public void subscribe(ObservableEmitter<Boolean> e) throws Exception {
// prefix "package:"
String packageName = intent.getData().toString().substring(8);
List<PackageRecord> list = PackageRecord.listAll(PackageRecord.class);
for (PackageRecord record : list) {
if (TextUtils.equals(record.name, packageName)) {
record.delete();
}
}
e.onNext(true);
e.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.subscribe();
}
RxJavaFragment.java 文件源码
项目:android-study
阅读 26
收藏 0
点赞 0
评论 0
/**
* sample操作符每隔指定的时间就从上游中取出一个事件发送给下游.
*/
private void doSample() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 1000; i++) { //模拟无限循环发送事件
emitter.onNext(i);
}
}
})
.subscribeOn(Schedulers.io())
.sample(1, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
Log.d(TAG, "" + integer);
}
});
}
CourseStore.java 文件源码
项目:Dalaran
阅读 29
收藏 0
点赞 0
评论 0
/**
* 查找数据
*
* @param cacheId
* @return
*/
public Observable<Course> findDataByIdentifier(@NonNull final String cacheId) {
Observable<Course> courseObservable = Observable.create(new ObservableOnSubscribe<Course>() {
@Override
public void subscribe(ObservableEmitter<Course> e) throws Exception {
Util.logMethodThreadId("findDataByIdentifier");
long time = System.currentTimeMillis();
try {
Course result = (Course) mIDBEngine.find(cacheId, Course.class);
time = System.currentTimeMillis() - time;
Util.log("<-- End getCache2Disk(" + time + "):" + "[identifier] = " + cacheId + " [data] = " + (result != null ? result.getData() : "null"));
if (result != null) {
e.onNext(result);
}
e.onComplete();
} catch (XDBException d) {
e.onError(d);
}
}
});
return courseObservable;
}