java类io.reactivex.ObservableOnSubscribe的实例源码

RxBeacon.java 文件源码 项目:RxBeacon 阅读 27 收藏 0 点赞 0 评论 0
public Observable<RxBeaconRange> beaconsInRegion() {
    return startup()
            .flatMap(new Function<Boolean, ObservableSource<RxBeaconRange>>() {
                @Override
                public ObservableSource<RxBeaconRange> apply(@NonNull Boolean aBoolean) throws Exception {
                    return Observable.create(new ObservableOnSubscribe<RxBeaconRange>() {
                        @Override
                        public void subscribe(@NonNull final ObservableEmitter<RxBeaconRange> objectObservableEmitter) throws Exception {
                            beaconManager.addRangeNotifier(new RangeNotifier() {
                                @Override
                                public void didRangeBeaconsInRegion(Collection<Beacon> collection, Region region) {
                                    objectObservableEmitter.onNext(new RxBeaconRange(collection, region));
                                }
                            });
                            beaconManager.startRangingBeaconsInRegion(getRegion());
                        }
                    });
                }
            });
}
ThrottleLastExampleActivity.java 文件源码 项目:GitHub 阅读 27 收藏 0 点赞 0 评论 0
private Observable<Integer> getObservable() {
    return Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            // send events with simulated time wait
            Thread.sleep(0);
            emitter.onNext(1); // skip
            emitter.onNext(2); // deliver
            Thread.sleep(505);
            emitter.onNext(3); // skip
            Thread.sleep(99);
            emitter.onNext(4); // skip
            Thread.sleep(100);
            emitter.onNext(5); // skip
            emitter.onNext(6); // deliver
            Thread.sleep(305);
            emitter.onNext(7); // deliver
            Thread.sleep(510);
            emitter.onComplete();
        }
    });
}
RecipeDatabaseHelper.java 文件源码 项目:BakingApp 阅读 33 收藏 0 点赞 0 评论 0
public void insertRecipes(@NonNull final ArrayList<Recipe> recipes, Observer<Integer> observer){
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            ContentValues[] contentValues = new ContentValues[recipes.size()];
            for (int i = 0; i < recipes.size(); ++i) {
                contentValues[i] = buildContentValuesFromRecipe(recipes.get(i));
            }
            int recipesAdded = mContext.getContentResolver().bulkInsert(RecipesContract.RecipeEntry.CONTENT_URI, contentValues);
            if (recipesAdded != 0){
                e.onNext(recipesAdded);
            } else {
                e.onError(new NullPointerException("Failed to insert"));
            }
            e.onComplete();
        }
    }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(observer);
}
TaskRepository.java 文件源码 项目:simple-stack 阅读 33 收藏 0 点赞 0 评论 0
private Observable<List<Task>> createResults(QuerySelector<DbTask> querySelector) {
    return Observable.create((ObservableOnSubscribe<List<Task>>) emitter -> {
        Realm realm = Realm.getDefaultInstance();
        final RealmResults<DbTask> dbTasks = querySelector.createQuery(realm);
        final RealmChangeListener<RealmResults<DbTask>> realmChangeListener = element -> {
            if(element.isLoaded() && !emitter.isDisposed()) {
                List<Task> tasks = mapFrom(element);
                if(!emitter.isDisposed()) {
                    emitter.onNext(tasks);
                }
            }
        };
        emitter.setDisposable(Disposables.fromAction(() -> {
            if(dbTasks.isValid()) {
                dbTasks.removeChangeListener(realmChangeListener);
            }
            realm.close();
        }));
        dbTasks.addChangeListener(realmChangeListener);
    }).subscribeOn(looperScheduler.getScheduler()).unsubscribeOn(looperScheduler.getScheduler());
}
DebounceExampleActivity.java 文件源码 项目:RxJava2-Android-Sample 阅读 26 收藏 0 点赞 0 评论 0
private Observable<Integer> getObservable() {
    return Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            // send events with simulated time wait
            //1(drop)--(400s<500s)---2(pass)---(600s>500s)---3(drop)---(100s<500s)---4(pass)---(605s>500s)---5(pass)---510s
            emitter.onNext(1); // skip
            Thread.sleep(400);
            emitter.onNext(2); // deliver
            Thread.sleep(600);
            emitter.onNext(3); // skip
            Thread.sleep(100);
            emitter.onNext(4); // deliver
            Thread.sleep(605);
            emitter.onNext(5); // deliver
            Thread.sleep(510);
            emitter.onComplete();
        }
    });
}
IconShowPresenter.java 文件源码 项目:MBEStyle 阅读 30 收藏 0 点赞 0 评论 0
public Disposable getAllIcons() {
    return Observable.create(new ObservableOnSubscribe<IconBean>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<IconBean> e) throws Exception {
            XmlResourceParser xml = mView.getResources().getXml(R.xml.drawable);

            while (xml.getEventType() != XmlResourceParser.END_DOCUMENT) {
                if (xml.getEventType() == XmlPullParser.START_TAG) {
                    if (xml.getName().startsWith("item")) {
                        IconBean bean = new IconBean();

                        String iconName = xml.getAttributeValue(null, "drawable");
                        bean.id = mView.getResources().getIdentifier(
                                iconName, "drawable", BuildConfig.APPLICATION_ID);
                        bean.name = iconName;

                        e.onNext(bean);
                    }
                }
                xml.next();
            }
            e.onComplete();
        }
    }).toList().subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<List<IconBean>>() {
                @Override
                public void accept(List<IconBean> list) throws Exception {
                    mView.onLoadData(list);
                }
            });
}
MapActivity.java 文件源码 项目:Rxjava2.0Demo 阅读 31 收藏 0 点赞 0 评论 0
private void map() {
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
        }
    }).map(new Function<Integer, String>() {
        @Override
        public String apply(Integer integer) throws Exception {
            return "This is result " + integer;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.e(MainActivity.TAG, "accept: " + Thread.currentThread().getName());
            info += s + "\n";
            tv.setText(info);
        }
    });
}
NetTest.java 文件源码 项目:OKHttpLoggingInterceptor 阅读 28 收藏 0 点赞 0 评论 0
public void test()
{
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception
        {
            e.onNext(1);
        }
    }).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception
        {
            System.out.println(integer);
        }
    });
}
UnlimitPostActivity.java 文件源码 项目:GetStartRxJava2.0 阅读 33 收藏 0 点赞 0 评论 0
private void doRxJavaWork() {
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            for (;;) { // 无限循环发送事件
                emitter.onNext(Integer.MAX_VALUE);
            }
        }
    }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "" + integer);
                }
            });
}
BasicTest.java 文件源码 项目:Android-Code-Demos 阅读 31 收藏 0 点赞 0 评论 0
public Observable<String> getObservable() {
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("Today's news update");
                e.onNext("Today's topic is Study");
                e.onComplete();
            }
        });
        /* 下面两个方法作用类似,just 的内部调用的就是 fromArray */
//     return Observable.just("Topic 1", "Heat 1", "News");
//     return Observable.fromArray("Topic 1", "Heat 1", "News");
        /* 只能发送一个数据 */
        /*return Observable.fromCallable(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "Topic is Study";
            }
        });*/
    }
ConfigPresenter.java 文件源码 项目:NeteaseCloudMusic 阅读 31 收藏 0 点赞 0 评论 0
public void requestLoadingList() {
    Observable.create(new ObservableOnSubscribe<List<ConfigBean>>() {
        @Override
        public void subscribe(ObservableEmitter<List<ConfigBean>> e) throws Exception {
            mModel = ConfigModel.getInstance(configView.getContext());
            e.onNext(mModel.getConfigList());
            mModel.setConfigCallback(ConfigPresenter.this);
        }
    })
            .observeOn(Schedulers.io())
            .subscribeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<List<ConfigBean>>() {
                @Override
                public void accept(List<ConfigBean> list) throws Exception {
                    configView.displayConfigList(list);
                }
            });

}
RxClick.java 文件源码 项目:AndroidMVPresenter 阅读 30 收藏 0 点赞 0 评论 0
public static Observable<View> with(final View view) {
    return Observable.create(new ObservableOnSubscribe<View>() {
        @Override
        public void subscribe(@NonNull final ObservableEmitter<View> e) throws Exception {
            new Handler(Looper.getMainLooper()).post(new Runnable() {
                @Override
                public void run() {
                    view.setOnClickListener(new View.OnClickListener() {
                        @Override
                        public void onClick(View value) {
                            e.onNext(value);
                        }
                    });
                }
            });
        }
    });
}
TingPlayProcessor.java 文件源码 项目:AssistantBySDK 阅读 34 收藏 0 点赞 0 评论 0
/**
 * 按一级分类查找专辑
 **/
private Observable<List<Album>> getAlbumByCate(String cateId, int calc_dimension) {
    final Map<String, String> params = new HashMap<>();
    params.put(DTransferConstants.CATEGORY_ID, cateId);
    params.put(DTransferConstants.CALC_DIMENSION, String.valueOf(calc_dimension));
    return Observable.create(new ObservableOnSubscribe<List<Album>>() {
        @Override
        public void subscribe(final ObservableEmitter<List<Album>> e) throws Exception {
            CommonRequest.getAlbumList(params, new IDataCallBack<AlbumList>() {
                @Override
                public void onSuccess(AlbumList albumList) {
                    //onNext的参数不允许为null
                    e.onNext(albumList.getAlbums());
                    e.onComplete();
                }

                @Override
                public void onError(int i, String s) {
                    e.onError(new Throwable(i + " " + s));
                }
            });
        }
    }).subscribeOn(Schedulers.io());
}
TingPlayProcessor.java 文件源码 项目:AssistantBySDK 阅读 30 收藏 0 点赞 0 评论 0
/**
 * 按关键词查找专辑
 **/
private Observable<List<Album>> getAlbumByKeyWord(String keyword, String cateId, int calc_dimension) {
    final Map<String, String> params = new HashMap<>();
    params.put(DTransferConstants.SEARCH_KEY, keyword);
    params.put(DTransferConstants.CATEGORY_ID, cateId);
    params.put(DTransferConstants.CALC_DIMENSION, String.valueOf(calc_dimension));
    return Observable.create(new ObservableOnSubscribe<List<Album>>() {
        @Override
        public void subscribe(final ObservableEmitter<List<Album>> e) throws Exception {
            CommonRequest.getSearchedAlbums(params, new IDataCallBack<SearchAlbumList>() {
                @Override
                public void onSuccess(SearchAlbumList searchAlbumList) {
                    e.onNext(searchAlbumList.getAlbums());
                    e.onComplete();
                }

                @Override
                public void onError(int i, String s) {
                    e.onError(new Throwable(i + " " + s));
                }
            });
        }
    })
            .subscribeOn(Schedulers.io());
}
StandardBuildsService.java 文件源码 项目:starcraft-2-build-player 阅读 24 收藏 0 点赞 0 评论 0
/**
 * Returns an observable on the progress of loading stock build orders into the local SQLite DB.
 * Should be scheduled on a worker thread.
 *
 * @param c context
 * @param forceLoad if false, builds are only copied if an upgrade is required. If true,
 *                  standard builds are always copied.
 * @return observable on load progress (percentage)
 */
public static Observable<Integer> getLoadStandardBuildsIntoDBObservable(final Context c, final boolean forceLoad) {
    return Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull final ObservableEmitter<Integer> emitter) throws Exception {
            try {
                if (!emitter.isDisposed()) {
                    loadStandardBuildsIntoDB(c, forceLoad, new DbAdapter.ProgressListener() {
                        @Override
                        public void onProgressUpdate(int percent) {
                            if (!emitter.isDisposed()) {
                                emitter.onNext(percent);
                            }
                        }
                    });
                    emitter.onComplete();
                }
            } catch (Exception e) {
                emitter.onError(e);
            }
        }
    });
}
ReactiveAirplaneMode.java 文件源码 项目:ReactiveAirplaneMode 阅读 30 收藏 0 点赞 0 评论 0
/**
 * Observes Airplane Mode state of the device with BroadcastReceiver.
 * RxJava2 Observable emits true if the airplane mode turns on and false otherwise.
 *
 * @param context of the Application or Activity
 * @return RxJava2 Observable with Boolean value indicating state of the airplane mode
 */
public Observable<Boolean> observe(final Context context) {
  checkContextIsNotNull(context);
  final IntentFilter filter = createIntentFilter();

  return Observable.create(new ObservableOnSubscribe<Boolean>() {
    @Override public void subscribe(@NonNull final ObservableEmitter<Boolean> emitter)
        throws Exception {
      final BroadcastReceiver receiver = createBroadcastReceiver(emitter);
      context.registerReceiver(receiver, filter);

      final Disposable disposable = disposeInUiThread(new Action() {
        @Override public void run() throws Exception {
          tryToUnregisterReceiver(receiver, context);
        }
      });

      emitter.setDisposable(disposable);
    }
  });
}
AccountingActivity.java 文件源码 项目:AssistantBySDK 阅读 105 收藏 0 点赞 0 评论 0
/**
 * 更新余额、当日收支
 **/
public void updateBalance(final int type, final List<TaskCard<Accounting>> taskcards) {
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            CountToday();
            CountBalance(type, taskcards);
            e.onNext(0);
        }
    })
            .subscribeOn(Schedulers.io())   //执行订阅(subscribe())所在线程
            .observeOn(AndroidSchedulers.mainThread())  //响应订阅(Sbscriber)所在线程
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    if (AppConfig.dPreferences.getBoolean(AppConfig.HAS_AMOUNT, false))
                        mTvBalance.setText("¥" + AssistUtils.formatAmount(balance));
                    mAdapter.notifyItemChanged(0);
                }
            });
}
RxGps.java 文件源码 项目:RxGps 阅读 42 收藏 0 点赞 0 评论 0
private Observable<Boolean> checkPlayServicesAvailable() {
    return Observable.create(new ObservableOnSubscribe<Boolean>() {
        @Override
        public void subscribe(ObservableEmitter<Boolean> e) throws Exception {
            final Activity activity = activityReference.get();
            if (activity != null) {
                final GoogleApiAvailability apiAvailability = GoogleApiAvailability.getInstance();
                final int status = apiAvailability.isGooglePlayServicesAvailable(activity);

                if (status != ConnectionResult.SUCCESS) {
                    e.onError(new PlayServicesNotAvailableException());
                } else {
                    e.onNext(true);
                    e.onComplete();
                }
            }
        }
    });
}
RxJavaUnitTest.java 文件源码 项目:code-examples-android-expert 阅读 38 收藏 0 点赞 0 评论 0
@Test public void test(){
    Observable<Todo> todoObservable = Observable.create(new ObservableOnSubscribe<Todo>() {
        @Override
        public void subscribe(ObservableEmitter<Todo> emitter) throws Exception {
            try {
                List<Todo> todos = RxJavaUnitTest.this.getTodos();
                if (todos!=null){
                    throw new NullPointerException("todos was null");
                }
                for (Todo todo : todos) {
                    emitter.onNext(todo);
                }
                emitter.onComplete();
            } catch (Exception e) {
                emitter.onError(e);
            }
        }
    });
    TestObserver<Object> testObserver = new TestObserver<>();
    todoObservable.subscribeWith(testObserver);
    testObserver.assertError(NullPointerException.class);

}
DemoForTraceActivity.java 文件源码 项目:SAF-AOP 阅读 29 收藏 0 点赞 0 评论 0
@Trace(enable = false)
private void initData() {

    Observable.create(new ObservableOnSubscribe<String>() {

        @Trace
        @Override
        public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {

            e.onNext("111");
            e.onNext("222");
            e.onNext("333");

        }
    }).subscribe(new Consumer<String>() {

        @Trace
        @Override
        public void accept(@NonNull String str) throws Exception {

        }
    });
}
DatabaseManager.java 文件源码 项目:simple-stack 阅读 29 收藏 0 点赞 0 评论 0
public void openDatabase() {
    disposable = Observable.create((ObservableOnSubscribe<Realm>) emitter -> {
        final Realm observableRealm = Realm.getDefaultInstance();
        final RealmChangeListener<Realm> listener = realm -> {
            if(!emitter.isDisposed()) {
                emitter.onNext(observableRealm);
            }
        };
        observableRealm.addChangeListener(listener);
        emitter.setDisposable(Disposables.fromAction(() -> {
            observableRealm.removeChangeListener(listener);
            observableRealm.close();
        }));
        emitter.onNext(observableRealm);
    }).subscribeOn(looperScheduler.getScheduler()).unsubscribeOn(looperScheduler.getScheduler()).subscribe();
}
RxJsoup.java 文件源码 项目:RxRetroJsoup 阅读 25 收藏 0 点赞 0 评论 0
public Observable<String> text(final Element element, final String expression) {
    return Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
            final Elements elements = element.select(expression);
            if (elements.isEmpty() && exceptionIfNotFound) {
                observableEmitter.onError(new NotFoundException(expression, element.toString()));
            } else {
                if (elements.isEmpty()) {
                    observableEmitter.onNext("");
                } else {
                    for (Element e : elements) {
                        observableEmitter.onNext(e.text());
                    }
                }
                observableEmitter.onComplete();
            }
        }


    });
}
Create.java 文件源码 项目:RxJava4AndroidDemos 阅读 52 收藏 0 点赞 0 评论 0
@Override
public void test3() {
    Log.i(TAG, "test3() Create simple demo, onNext() twice");
    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
            for (int i = 0; i < 3; i++) {
                e.onNext(String.valueOf(i));
            }
        }
    });
    for (int time = 0; time < 2; time++) {
        observable.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "Consumer<String> accept() s: " + s);
            }
        });
    }
}
DemoObservable.java 文件源码 项目:Reactive-Programming-With-Java-9 阅读 28 收藏 0 点赞 0 评论 0
public static void main(String[] args) {
 Observable<String> month_observable = Observable.create(new 
      ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter)    
           throws Exception {
            // TODO Auto-generated method stub
            try {
                String[] monthArray = { "Jan", "Feb", "Mar",    
                       "Apl", "May", "Jun", "July", "Aug",  
                       "Sept", "Oct","Nov", "Dec" };

                List<String> months = Arrays.asList(monthArray);

                for (String month : months) {
                    emitter.onNext(month);
                }
                emitter.onComplete();
            } catch (Exception e) {
                // TODO: handle exception
                emitter.onError(e);
            }
        }
    });
     month_observable.subscribe(s -> System.out.println(s));
}
ThrottleLastExampleActivity.java 文件源码 项目:RxJava2-Android-Sample 阅读 29 收藏 0 点赞 0 评论 0
private Observable<Integer> getObservable() {
    return Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            // send events with simulated time wait
            Thread.sleep(0);
            emitter.onNext(1); // skip
            emitter.onNext(2); // deliver
            Thread.sleep(505);
            emitter.onNext(3); // skip
            Thread.sleep(99);
            emitter.onNext(4); // skip
            Thread.sleep(100);
            emitter.onNext(5); // skip
            emitter.onNext(6); // deliver
            Thread.sleep(305);
            emitter.onNext(7); // deliver
            Thread.sleep(510);
            emitter.onComplete();
        }
    });
}
TempStorageUtils.java 文件源码 项目:PXLSRT 阅读 29 收藏 0 点赞 0 评论 0
static Observable<Boolean> storeFile(final FileOutputStream fos, final Bitmap bitmap, final int quality) {
    return Observable.create(new ObservableOnSubscribe<Boolean>() {
        @Override
        public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
            bitmap.compress(Bitmap.CompressFormat.JPEG, quality, fos);
            bitmap.recycle();
            try {
                fos.flush();
                fos.close();
                emitter.onNext(true);
            } catch (IOException e) {
                e.printStackTrace();
                emitter.onError(e);
            }
        }
    });
}
HistoryModel.java 文件源码 项目:Aurora 阅读 32 收藏 0 点赞 0 评论 0
@Override
public Observable<List<VideoDaoEntity>> getListFromNet(int start, String userid) {
    return Observable.create((ObservableOnSubscribe<List<VideoDaoEntity>>) emitter -> {

        BmobQuery<VideoDaoEntity> query = new BmobQuery<VideoDaoEntity>();
        query.addWhereEqualTo("userId", userid);
        query.setLimit(10);
        query.order("-updatedAt");
        query.setSkip(start);
        query.findObjects(new FindListener<VideoDaoEntity>() {
            @Override
            public void done(List<VideoDaoEntity> list, BmobException e) {
                List<VideoDaoEntity> infolist = new ArrayList<VideoDaoEntity>();
                if (!StringUtils.isEmpty(list)) {
                    for (VideoDaoEntity entity1 : list) {
                        entity1.setVideo(mGson.fromJson(entity1.getBody(), VideoListInfo.Video.VideoData.class));
                        infolist.add(entity1);
                    }
                }
                emitter.onNext(infolist);
            }
        });
    });
}
MountReceiver.java 文件源码 项目:Mount 阅读 25 收藏 0 点赞 0 评论 0
private void onActionPackageFullyRemoved(final Intent intent) {
    Observable.create(
            new ObservableOnSubscribe<Boolean>() {
                @Override
                public void subscribe(ObservableEmitter<Boolean> e) throws Exception {
                    // prefix "package:"
                    String packageName = intent.getData().toString().substring(8);
                    List<PackageRecord> list = PackageRecord.listAll(PackageRecord.class);
                    for (PackageRecord record : list) {
                        if (TextUtils.equals(record.name, packageName)) {
                            record.delete();
                        }
                    }

                    e.onNext(true);
                    e.onComplete();
                }
            })
            .subscribeOn(Schedulers.newThread())
            .subscribe();
}
RxJavaFragment.java 文件源码 项目:android-study 阅读 26 收藏 0 点赞 0 评论 0
/**
 * sample操作符每隔指定的时间就从上游中取出一个事件发送给下游.
 */
private void doSample() {
  Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
      for (int i = 0; i < 1000; i++) {  //模拟无限循环发送事件
        emitter.onNext(i);
      }
    }
  })
      .subscribeOn(Schedulers.io())
      .sample(1, TimeUnit.MILLISECONDS)
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(new Consumer<Integer>() {
        @Override public void accept(Integer integer) throws Exception {
          Log.d(TAG, "" + integer);
        }
      });
}
CourseStore.java 文件源码 项目:Dalaran 阅读 29 收藏 0 点赞 0 评论 0
/**
 * 查找数据
 *
 * @param cacheId
 * @return
 */
public Observable<Course> findDataByIdentifier(@NonNull final String cacheId) {
    Observable<Course> courseObservable = Observable.create(new ObservableOnSubscribe<Course>() {
        @Override
        public void subscribe(ObservableEmitter<Course> e) throws Exception {
            Util.logMethodThreadId("findDataByIdentifier");
            long time = System.currentTimeMillis();
            try {
                Course result = (Course) mIDBEngine.find(cacheId, Course.class);
                time = System.currentTimeMillis() - time;
                Util.log("<-- End getCache2Disk(" + time + "):" + "[identifier] = " + cacheId + " [data] = " + (result != null ? result.getData() : "null"));
                if (result != null) {
                    e.onNext(result);
                }
                e.onComplete();
            } catch (XDBException d) {
                e.onError(d);
            }
        }
    });
    return courseObservable;

}


问题


面经


文章

微信
公众号

扫码关注公众号