/**
* Using the returned Observable, you can be notified about data changes.
* Once a transaction is committed, you will get info on classes with changed Objects.
*/
public static <T> Observable<Class> observable(final BoxStore boxStore) {
return Observable.create(new ObservableOnSubscribe<Class>() {
@Override
public void subscribe(final ObservableEmitter<Class> emitter) throws Exception {
final DataSubscription dataSubscription = boxStore.subscribe().observer(new DataObserver<Class>() {
@Override
public void onData(Class data) {
if (!emitter.isDisposed()) {
emitter.onNext(data);
}
}
});
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
dataSubscription.cancel();
}
});
}
});
}
java类io.reactivex.functions.Cancellable的实例源码
RxBoxStore.java 文件源码
项目:ObjectBoxRxJava
阅读 36
收藏 0
点赞 0
评论 0
RxQuery.java 文件源码
项目:ObjectBoxRxJava
阅读 36
收藏 0
点赞 0
评论 0
static <T> void createListItemEmitter(final Query<T> query, final FlowableEmitter<T> emitter) {
final DataSubscription dataSubscription = query.subscribe().observer(new DataObserver<List<T>>() {
@Override
public void onData(List<T> data) {
for (T datum : data) {
if (emitter.isCancelled()) {
return;
} else {
emitter.onNext(datum);
}
}
if (!emitter.isCancelled()) {
emitter.onComplete();
}
}
});
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
dataSubscription.cancel();
}
});
}
RxQuery.java 文件源码
项目:ObjectBoxRxJava
阅读 35
收藏 0
点赞 0
评论 0
/**
* The returned Observable emits Query results as Lists.
* Never completes, so you will get updates when underlying data changes.
*/
public static <T> Observable<List<T>> observable(final Query<T> query) {
return Observable.create(new ObservableOnSubscribe<List<T>>() {
@Override
public void subscribe(final ObservableEmitter<List<T>> emitter) throws Exception {
final DataSubscription dataSubscription = query.subscribe().observer(new DataObserver<List<T>>() {
@Override
public void onData(List<T> data) {
if (!emitter.isDisposed()) {
emitter.onNext(data);
}
}
});
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
dataSubscription.cancel();
}
});
}
});
}
RxQuery.java 文件源码
项目:ObjectBoxRxJava
阅读 34
收藏 0
点赞 0
评论 0
/**
* The returned Single emits one Query result as a List.
*/
public static <T> Single<List<T>> single(final Query<T> query) {
return Single.create(new SingleOnSubscribe<List<T>>() {
@Override
public void subscribe(final SingleEmitter<List<T>> emitter) throws Exception {
final DataSubscription dataSubscription = query.subscribe().single().observer(new DataObserver<List<T>>() {
@Override
public void onData(List<T> data) {
if (!emitter.isDisposed()) {
emitter.onSuccess(data);
}
}
});
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
dataSubscription.cancel();
}
});
}
});
}
RxLocationFlowableOnSubscribe.java 文件源码
项目:RxJava2-weather-example
阅读 31
收藏 0
点赞 0
评论 0
@Override
public final void subscribe(FlowableEmitter<T> emitter) throws Exception {
final GoogleApiClient apiClient = createApiClient(new ApiClientConnectionCallbacks(emitter));
try {
apiClient.connect();
} catch (Throwable ex) {
emitter.onError(ex);
}
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
if (apiClient.isConnected()) {
RxLocationFlowableOnSubscribe.this.onUnsubscribed(apiClient);
}
apiClient.disconnect();
}
});
}
RxLocationMaybeOnSubscribe.java 文件源码
项目:RxJava2-weather-example
阅读 27
收藏 0
点赞 0
评论 0
@Override
public final void subscribe(MaybeEmitter<T> emitter) throws Exception {
final GoogleApiClient apiClient = createApiClient(new ApiClientConnectionCallbacks(emitter));
try {
apiClient.connect();
} catch (Throwable ex) {
emitter.onError(ex);
}
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
if (apiClient.isConnected()) {
RxLocationMaybeOnSubscribe.this.onUnsubscribed(apiClient);
}
apiClient.disconnect();
}
});
}
RxLocationSingleOnSubscribe.java 文件源码
项目:RxJava2-weather-example
阅读 37
收藏 0
点赞 0
评论 0
@Override
public final void subscribe(SingleEmitter<T> emitter) throws Exception {
final GoogleApiClient apiClient = createApiClient(new ApiClientConnectionCallbacks(emitter));
try {
apiClient.connect();
} catch (Throwable ex) {
emitter.onError(ex);
}
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
if (apiClient.isConnected()) {
RxLocationSingleOnSubscribe.this.onUnsubscribed(apiClient);
}
apiClient.disconnect();
}
});
}
RxLocation.java 文件源码
项目:RxLocation
阅读 27
收藏 0
点赞 0
评论 0
/**
* Yields periodical location updates.
*
* This observable will never call onComplete() thus manual unsubscribe() is necessary.
*
* When using setExpirationDuration() or setNumUpdates() or setExpirationTime() the observable
* will not terminate automatically and will just stop emitting new items without releasing any
* resources.
*
* @return an Observable that returns Location items.
*/
@SuppressWarnings("WeakerAccess") // It's an entry point.
public static Observable<Location> locationUpdates(final Context context,
final LocationRequest locationRequest) {
return Observable.create(new ObservableOnSubscribe<Location>() {
@Override
public void subscribe(ObservableEmitter<Location> e) throws Exception {
final LocationUpdatesHelper locationUpdatesHelper = new LocationUpdatesHelper(
context, new GoogleApiClientFactoryImpl(),
new FusedLocationProviderFactoryImpl(), e, locationRequest);
e.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
locationUpdatesHelper.stop();
}
});
locationUpdatesHelper.start();
}
});
}
RxLocation.java 文件源码
项目:RxLocation
阅读 24
收藏 0
点赞 0
评论 0
/**
* Yields the last location available to the system.
*
* This observable will emit only one element and then call onComplete.
*
* @return an Observable that returns one Location item.
*/
@SuppressWarnings("WeakerAccess") // It's an entry point.
public static Single<Location> lastLocation(final Context context) {
return Observable.create(new ObservableOnSubscribe<Location>() {
@Override
public void subscribe(ObservableEmitter<Location> e) throws Exception {
final LastLocationHelper lastLocationHelper = new LastLocationHelper(
context, new GoogleApiClientFactoryImpl(),
new FusedLocationProviderFactoryImpl(), e);
e.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
lastLocationHelper.stop();
}
});
lastLocationHelper.start();
}
}).singleOrError();
}
RxArrayDisk.java 文件源码
项目:Floppy
阅读 27
收藏 0
点赞 0
评论 0
public <T> Observable<T> observe(final String key) {
return Observable.create(new ObservableOnSubscribe<T>() {
@Override
public void subscribe(final ObservableEmitter<T> e) throws Exception {
addOnWriteListener(key, new OnWriteListener<T>() {
@Override
public void onWrite(T object) {
if (!e.isDisposed()) {
e.onNext(object);
}
e.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
removeListener(key);
}
});
}
});
}
});
}
AutoDisposeMaybeObserverTest.java 文件源码
项目:AutoDispose
阅读 37
收藏 0
点赞 0
评论 0
@Test public void verifyCancellation() {
final AtomicInteger i = new AtomicInteger();
//noinspection unchecked because Java
Maybe<Integer> source = Maybe.create(new MaybeOnSubscribe<Integer>() {
@Override public void subscribe(MaybeEmitter<Integer> e) {
e.setCancellable(new Cancellable() {
@Override public void cancel() {
i.incrementAndGet();
}
});
}
});
MaybeSubject<Integer> lifecycle = MaybeSubject.create();
source.as(AutoDispose.<Integer>autoDisposable(lifecycle))
.subscribe();
assertThat(i.get()).isEqualTo(0);
assertThat(lifecycle.hasObservers()).isTrue();
lifecycle.onSuccess(0);
// Verify cancellation was called
assertThat(i.get()).isEqualTo(1);
assertThat(lifecycle.hasObservers()).isFalse();
}
AutoDisposeCompletableObserverTest.java 文件源码
项目:AutoDispose
阅读 31
收藏 0
点赞 0
评论 0
@Test public void verifyCancellation() {
final AtomicInteger i = new AtomicInteger();
//noinspection unchecked because Java
Completable source = Completable.create(new CompletableOnSubscribe() {
@Override public void subscribe(CompletableEmitter e) {
e.setCancellable(new Cancellable() {
@Override public void cancel() {
i.incrementAndGet();
}
});
}
});
MaybeSubject<Integer> lifecycle = MaybeSubject.create();
source.as(autoDisposable(lifecycle))
.subscribe();
assertThat(i.get()).isEqualTo(0);
assertThat(lifecycle.hasObservers()).isTrue();
lifecycle.onSuccess(0);
// Verify cancellation was called
assertThat(i.get()).isEqualTo(1);
assertThat(lifecycle.hasObservers()).isFalse();
}
AutoDisposeSingleObserverTest.java 文件源码
项目:AutoDispose
阅读 33
收藏 0
点赞 0
评论 0
@Test public void verifyCancellation() {
final AtomicInteger i = new AtomicInteger();
//noinspection unchecked because Java
Single<Integer> source = Single.create(new SingleOnSubscribe<Integer>() {
@Override public void subscribe(SingleEmitter<Integer> e) {
e.setCancellable(new Cancellable() {
@Override public void cancel() {
i.incrementAndGet();
}
});
}
});
MaybeSubject<Integer> lifecycle = MaybeSubject.create();
source.as(AutoDispose.<Integer>autoDisposable(lifecycle))
.subscribe();
assertThat(i.get()).isEqualTo(0);
assertThat(lifecycle.hasObservers()).isTrue();
lifecycle.onSuccess(0);
// Verify cancellation was called
assertThat(i.get()).isEqualTo(1);
assertThat(lifecycle.hasObservers()).isFalse();
}
AppStateObservableOnSubscribe.java 文件源码
项目:RxAppState
阅读 30
收藏 0
点赞 0
评论 0
@Override
public void subscribe(@NonNull final ObservableEmitter<AppState> appStateEmitter) throws Exception {
final AppStateListener appStateListener = new AppStateListener() {
@Override
public void onAppDidEnterForeground() {
appStateEmitter.onNext(FOREGROUND);
}
@Override
public void onAppDidEnterBackground() {
appStateEmitter.onNext(BACKGROUND);
}
};
appStateEmitter.setCancellable(new Cancellable() {
@Override public void cancel() throws Exception {
recognizer.removeListener(appStateListener);
recognizer.stop();
}
});
recognizer.addListener(appStateListener);
recognizer.start();
}
RxValue.java 文件源码
项目:rxfirebase
阅读 38
收藏 0
点赞 0
评论 0
/**
* @param query
* @return
*/
@NonNull
@CheckReturnValue
public static Single<DataSnapshot> single(@NonNull final Query query) {
return Single.create(new SingleOnSubscribe<DataSnapshot>() {
@Override
public void subscribe(
@NonNull final SingleEmitter<DataSnapshot> emit) throws Exception {
final ValueEventListener listener = listener(emit);
emit.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
query.removeEventListener(listener);
}
});
query.addListenerForSingleValueEvent(listener);
}
});
}
FingerprintObservable.java 文件源码
项目:RxFingerprint
阅读 27
收藏 0
点赞 0
评论 0
@Override
@RequiresPermission(USE_FINGERPRINT)
@RequiresApi(Build.VERSION_CODES.M)
public void subscribe(ObservableEmitter<T> emitter) throws Exception {
if (fingerprintApiWrapper.isUnavailable()) {
emitter.onError(new FingerprintUnavailableException("Fingerprint authentication is not available on this device! Ensure that the device has a Fingerprint sensor and enrolled Fingerprints by calling RxFingerprint#isAvailable(Context) first"));
return;
}
AuthenticationCallback callback = createAuthenticationCallback(emitter);
cancellationSignal = fingerprintApiWrapper.createCancellationSignal();
CryptoObject cryptoObject = initCryptoObject(emitter);
//noinspection MissingPermission
fingerprintApiWrapper.getFingerprintManager().authenticate(cryptoObject, cancellationSignal, 0, callback, null);
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
if (cancellationSignal != null && !cancellationSignal.isCanceled()) {
cancellationSignal.cancel();
}
}
});
}
SceneLifecycleListener.java 文件源码
项目:Stage
阅读 29
收藏 0
点赞 0
评论 0
public void addEmitter(@NonNull final ObservableEmitter<Integer> emitter) {
emitterList.add(emitter);
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
emitterList.remove(emitter);
}
});
emitMissingLifecycle(emitter);
}
RxFirebaseDatabase.java 文件源码
项目:showcase-android
阅读 33
收藏 0
点赞 0
评论 0
/**
* Listener for changes in te data at the given query location.
*
* @param query reference represents a particular location in your Database and can be used for reading or writing data to that Database location.
* @param strategy {@link BackpressureStrategy} associated to this {@link Flowable}
* @return a {@link Flowable} which emits when a value of the database change in the given query.
*/
@NonNull
public static Flowable<DataSnapshot> observeValueEvent(@NonNull final Query query,
@NonNull BackpressureStrategy strategy) {
return Flowable.create(new FlowableOnSubscribe<DataSnapshot>() {
@Override
public void subscribe(final FlowableEmitter<DataSnapshot> emitter) throws Exception {
final ValueEventListener valueEventListener = new ValueEventListener() {
@Override
public void onDataChange(DataSnapshot dataSnapshot) {
emitter.onNext(dataSnapshot);
}
@Override
public void onCancelled(final DatabaseError error) {
emitter.onError(new RxFirebaseDataException(error));
}
};
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
query.removeEventListener(valueEventListener);
}
});
query.addValueEventListener(valueEventListener);
}
}, strategy);
}
IMUSensorGatherer.java 文件源码
项目:AndroidSensors
阅读 28
收藏 0
点赞 0
评论 0
private void addUnsubscribeCallbackFor(FlowableEmitter<SensorRecord> subscriber,
final SensorEventListener sensorEventListener) {
subscriber.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
sensorManager.unregisterListener(sensorEventListener);
}
});
}
WifiMeasurementsGatherer.java 文件源码
项目:AndroidSensors
阅读 35
收藏 0
点赞 0
评论 0
private void addUnsubscribeCallbackFor(FlowableEmitter<SensorRecord> subscriber,
final BroadcastReceiver broadcastReceiver){
subscriber.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
context.unregisterReceiver(broadcastReceiver);
}
});
}
BLEMeasurementsGatherer.java 文件源码
项目:AndroidSensors
阅读 28
收藏 0
点赞 0
评论 0
@RequiresApi(api = Build.VERSION_CODES.LOLLIPOP)
private void addUnsuscribeCallbackFor(FlowableEmitter<SensorRecord> subscriber,
final ScanCallback scanCallback){
final BluetoothLeScanner scanner = bluetoothManager.getAdapter().getBluetoothLeScanner();
subscriber.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
scanner.flushPendingScanResults(scanCallback);
scanner.stopScan(scanCallback);
}
});
}
RawGPSMeasurementsGatherer.java 文件源码
项目:AndroidSensors
阅读 28
收藏 0
点赞 0
评论 0
@RequiresApi(Build.VERSION_CODES.N)
private void addUnsubscribeCallbackFor(FlowableEmitter<SensorRecord> subscriber,
final GnssMeasurementsEvent.Callback callback) {
subscriber.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
locationManager.unregisterGnssMeasurementsCallback(callback);
}
});
}
RawGPSStatusGatherer.java 文件源码
项目:AndroidSensors
阅读 27
收藏 0
点赞 0
评论 0
@RequiresApi(Build.VERSION_CODES.N)
private void addUnsubscribeCallbackFor(FlowableEmitter<SensorRecord> subscriber,
final GnssStatus.Callback callback) {
subscriber.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
locationManager.unregisterGnssStatusCallback(callback);
}
});
}
LocationGatherer.java 文件源码
项目:AndroidSensors
阅读 23
收藏 0
点赞 0
评论 0
private void addUnsubscribeCallbackFor(FlowableEmitter<SensorRecord> subscriber,
final LocationListener locationListener) {
subscriber.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
locationManager.removeUpdates(locationListener);
}
});
}
RawGPSNavigationGatherer.java 文件源码
项目:AndroidSensors
阅读 36
收藏 0
点赞 0
评论 0
@RequiresApi(Build.VERSION_CODES.N)
private void addUnsubscribeCallbackFor(FlowableEmitter<SensorRecord> subscriber,
final GnssNavigationMessage.Callback callback) {
subscriber.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
locationManager.unregisterGnssNavigationMessageCallback(callback);
}
});
}
AbsServerAsActivity.java 文件源码
项目:Attendance
阅读 22
收藏 0
点赞 0
评论 0
private Observable<String> createButtonClickObservable() {
// 2
return Observable.create(new ObservableOnSubscribe<String>() {
// 3
@Override
public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
// 4
onclicklist = new View.OnClickListener() {
@Override
public void onClick(View view) {
// 5
emitter.onNext(mQueryEditText.getText().toString());
}
};
mSearchButton.setOnClickListener(onclicklist);
// 6
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
// 7
onclicklist = null;
mSearchButton.setOnClickListener(null);
}
});
}
});
}
CheeseActivity.java 文件源码
项目:Attendance
阅读 29
收藏 0
点赞 0
评论 0
private Observable<String> createButtonClickObservable() {
// 2
return Observable.create(new ObservableOnSubscribe<String>() {
// 3
@Override
public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
// 4
mSearchButton.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View view) {
// 5
emitter.onNext(mQueryEditText.getText().toString());
}
});
// 6
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
// 7
mSearchButton.setOnClickListener(null);
}
});
}
});
}
AbsServerActivity.java 文件源码
项目:Attendance
阅读 25
收藏 0
点赞 0
评论 0
private Observable<String> createButtonClickObservable() {
// 2
return Observable.create(new ObservableOnSubscribe<String>() {
// 3
@Override
public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
// 4
mSearchButton.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View view) {
// 5
emitter.onNext(mQueryEditText.getText().toString());
}
});
// 6
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
// 7
mSearchButton.setOnClickListener(null);
}
});
}
});
}
CompanyChooseActivity.java 文件源码
项目:Attendance
阅读 28
收藏 0
点赞 0
评论 0
private Observable<String> createButtonClickObservable() {
// 2
return Observable.create(new ObservableOnSubscribe<String>() {
// 3
@Override
public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
// 4
onclicklist = new View.OnClickListener() {
@Override
public void onClick(View view) {
// 5
emitter.onNext(mQueryEditText.getText().toString());
}
};
mSearchButton.setOnClickListener(onclicklist);
// 6
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
// 7
onclicklist = null;
mSearchButton.setOnClickListener(null);
}
});
}
});
}
AutoDisposeObserverTest.java 文件源码
项目:AutoDispose
阅读 26
收藏 0
点赞 0
评论 0
@Test public void verifyCancellation() {
final AtomicInteger i = new AtomicInteger();
//noinspection unchecked because Java
final ObservableEmitter<Integer>[] emitter = new ObservableEmitter[1];
Observable<Integer> source = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(ObservableEmitter<Integer> e) {
e.setCancellable(new Cancellable() {
@Override public void cancel() {
i.incrementAndGet();
}
});
emitter[0] = e;
}
});
MaybeSubject<Integer> lifecycle = MaybeSubject.create();
source.as(AutoDispose.<Integer>autoDisposable(lifecycle))
.subscribe();
assertThat(i.get()).isEqualTo(0);
assertThat(lifecycle.hasObservers()).isTrue();
emitter[0].onNext(1);
lifecycle.onSuccess(0);
emitter[0].onNext(2);
// Verify cancellation was called
assertThat(i.get()).isEqualTo(1);
assertThat(lifecycle.hasObservers()).isFalse();
}