java类io.reactivex.functions.Cancellable的实例源码

RxBoxStore.java 文件源码 项目:ObjectBoxRxJava 阅读 36 收藏 0 点赞 0 评论 0
/**
 * Using the returned Observable, you can be notified about data changes.
 * Once a transaction is committed, you will get info on classes with changed Objects.
 */
public static <T> Observable<Class> observable(final BoxStore boxStore) {
    return Observable.create(new ObservableOnSubscribe<Class>() {
        @Override
        public void subscribe(final ObservableEmitter<Class> emitter) throws Exception {
            final DataSubscription dataSubscription = boxStore.subscribe().observer(new DataObserver<Class>() {
                @Override
                public void onData(Class data) {
                    if (!emitter.isDisposed()) {
                        emitter.onNext(data);
                    }
                }
            });
            emitter.setCancellable(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    dataSubscription.cancel();
                }
            });
        }
    });
}
RxQuery.java 文件源码 项目:ObjectBoxRxJava 阅读 36 收藏 0 点赞 0 评论 0
static <T> void createListItemEmitter(final Query<T> query, final FlowableEmitter<T> emitter) {
    final DataSubscription dataSubscription = query.subscribe().observer(new DataObserver<List<T>>() {
        @Override
        public void onData(List<T> data) {
            for (T datum : data) {
                if (emitter.isCancelled()) {
                    return;
                } else {
                    emitter.onNext(datum);
                }
            }
            if (!emitter.isCancelled()) {
                emitter.onComplete();
            }
        }
    });
    emitter.setCancellable(new Cancellable() {
        @Override
        public void cancel() throws Exception {
            dataSubscription.cancel();
        }
    });
}
RxQuery.java 文件源码 项目:ObjectBoxRxJava 阅读 35 收藏 0 点赞 0 评论 0
/**
 * The returned Observable emits Query results as Lists.
 * Never completes, so you will get updates when underlying data changes.
 */
public static <T> Observable<List<T>> observable(final Query<T> query) {
    return Observable.create(new ObservableOnSubscribe<List<T>>() {
        @Override
        public void subscribe(final ObservableEmitter<List<T>> emitter) throws Exception {
            final DataSubscription dataSubscription = query.subscribe().observer(new DataObserver<List<T>>() {
                @Override
                public void onData(List<T> data) {
                    if (!emitter.isDisposed()) {
                        emitter.onNext(data);
                    }
                }
            });
            emitter.setCancellable(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    dataSubscription.cancel();
                }
            });
        }
    });
}
RxQuery.java 文件源码 项目:ObjectBoxRxJava 阅读 34 收藏 0 点赞 0 评论 0
/**
 * The returned Single emits one Query result as a List.
 */
public static <T> Single<List<T>> single(final Query<T> query) {
    return Single.create(new SingleOnSubscribe<List<T>>() {
        @Override
        public void subscribe(final SingleEmitter<List<T>> emitter) throws Exception {
            final DataSubscription dataSubscription = query.subscribe().single().observer(new DataObserver<List<T>>() {
                @Override
                public void onData(List<T> data) {
                    if (!emitter.isDisposed()) {
                        emitter.onSuccess(data);
                    }
                }
            });
            emitter.setCancellable(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    dataSubscription.cancel();
                }
            });
        }
    });
}
RxLocationFlowableOnSubscribe.java 文件源码 项目:RxJava2-weather-example 阅读 31 收藏 0 点赞 0 评论 0
@Override
public final void subscribe(FlowableEmitter<T> emitter) throws Exception {
    final GoogleApiClient apiClient = createApiClient(new ApiClientConnectionCallbacks(emitter));

    try {
        apiClient.connect();
    } catch (Throwable ex) {
        emitter.onError(ex);
    }

    emitter.setCancellable(new Cancellable() {
        @Override
        public void cancel() throws Exception {
            if (apiClient.isConnected()) {
                RxLocationFlowableOnSubscribe.this.onUnsubscribed(apiClient);
            }

            apiClient.disconnect();
        }
    });
}
RxLocationMaybeOnSubscribe.java 文件源码 项目:RxJava2-weather-example 阅读 27 收藏 0 点赞 0 评论 0
@Override
public final void subscribe(MaybeEmitter<T> emitter) throws Exception {
    final GoogleApiClient apiClient = createApiClient(new ApiClientConnectionCallbacks(emitter));

    try {
        apiClient.connect();
    } catch (Throwable ex) {
        emitter.onError(ex);
    }

    emitter.setCancellable(new Cancellable() {
        @Override
        public void cancel() throws Exception {
            if (apiClient.isConnected()) {
                RxLocationMaybeOnSubscribe.this.onUnsubscribed(apiClient);
            }

            apiClient.disconnect();
        }
    });
}
RxLocationSingleOnSubscribe.java 文件源码 项目:RxJava2-weather-example 阅读 37 收藏 0 点赞 0 评论 0
@Override
public final void subscribe(SingleEmitter<T> emitter) throws Exception {
    final GoogleApiClient apiClient = createApiClient(new ApiClientConnectionCallbacks(emitter));

    try {
        apiClient.connect();
    } catch (Throwable ex) {
        emitter.onError(ex);
    }

    emitter.setCancellable(new Cancellable() {
        @Override
        public void cancel() throws Exception {
            if (apiClient.isConnected()) {
                RxLocationSingleOnSubscribe.this.onUnsubscribed(apiClient);
            }

            apiClient.disconnect();
        }
    });
}
RxLocation.java 文件源码 项目:RxLocation 阅读 27 收藏 0 点赞 0 评论 0
/**
 * Yields periodical location updates.
 *
 * This observable will never call onComplete() thus manual unsubscribe() is necessary.
 *
 * When using setExpirationDuration() or setNumUpdates() or setExpirationTime() the observable
 * will not terminate automatically and will just stop emitting new items without releasing any
 * resources.
 *
 * @return an Observable that returns Location items.
 */
@SuppressWarnings("WeakerAccess") // It's an entry point.
public static Observable<Location> locationUpdates(final Context context,
                                               final LocationRequest locationRequest) {
    return Observable.create(new ObservableOnSubscribe<Location>() {
        @Override
        public void subscribe(ObservableEmitter<Location> e) throws Exception {

            final LocationUpdatesHelper locationUpdatesHelper = new LocationUpdatesHelper(
                    context, new GoogleApiClientFactoryImpl(),
                    new FusedLocationProviderFactoryImpl(), e, locationRequest);

            e.setCancellable(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    locationUpdatesHelper.stop();
                }
            });

            locationUpdatesHelper.start();
        }
    });
}
RxLocation.java 文件源码 项目:RxLocation 阅读 24 收藏 0 点赞 0 评论 0
/**
 * Yields the last location available to the system.
 *
 * This observable will emit only one element and then call onComplete.
 *
 * @return an Observable that returns one Location item.
 */
@SuppressWarnings("WeakerAccess") // It's an entry point.
public static Single<Location> lastLocation(final Context context) {
    return Observable.create(new ObservableOnSubscribe<Location>() {
        @Override
        public void subscribe(ObservableEmitter<Location> e) throws Exception {

            final LastLocationHelper lastLocationHelper = new LastLocationHelper(
                    context, new GoogleApiClientFactoryImpl(),
                    new FusedLocationProviderFactoryImpl(), e);

            e.setCancellable(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    lastLocationHelper.stop();
                }
            });

            lastLocationHelper.start();
        }
    }).singleOrError();
}
RxArrayDisk.java 文件源码 项目:Floppy 阅读 27 收藏 0 点赞 0 评论 0
public <T> Observable<T> observe(final String key) {
    return Observable.create(new ObservableOnSubscribe<T>() {
        @Override
        public void subscribe(final ObservableEmitter<T> e) throws Exception {
            addOnWriteListener(key, new OnWriteListener<T>() {
                @Override
                public void onWrite(T object) {
                    if (!e.isDisposed()) {
                        e.onNext(object);
                    }
                    e.setCancellable(new Cancellable() {
                        @Override
                        public void cancel() throws Exception {
                            removeListener(key);
                        }
                    });
                }
            });
        }
    });
}
AutoDisposeMaybeObserverTest.java 文件源码 项目:AutoDispose 阅读 37 收藏 0 点赞 0 评论 0
@Test public void verifyCancellation() {
  final AtomicInteger i = new AtomicInteger();
  //noinspection unchecked because Java
  Maybe<Integer> source = Maybe.create(new MaybeOnSubscribe<Integer>() {
    @Override public void subscribe(MaybeEmitter<Integer> e) {
      e.setCancellable(new Cancellable() {
        @Override public void cancel() {
          i.incrementAndGet();
        }
      });
    }
  });
  MaybeSubject<Integer> lifecycle = MaybeSubject.create();
  source.as(AutoDispose.<Integer>autoDisposable(lifecycle))
      .subscribe();

  assertThat(i.get()).isEqualTo(0);
  assertThat(lifecycle.hasObservers()).isTrue();

  lifecycle.onSuccess(0);

  // Verify cancellation was called
  assertThat(i.get()).isEqualTo(1);
  assertThat(lifecycle.hasObservers()).isFalse();
}
AutoDisposeCompletableObserverTest.java 文件源码 项目:AutoDispose 阅读 31 收藏 0 点赞 0 评论 0
@Test public void verifyCancellation() {
  final AtomicInteger i = new AtomicInteger();
  //noinspection unchecked because Java
  Completable source = Completable.create(new CompletableOnSubscribe() {
    @Override public void subscribe(CompletableEmitter e) {
      e.setCancellable(new Cancellable() {
        @Override public void cancel() {
          i.incrementAndGet();
        }
      });
    }
  });
  MaybeSubject<Integer> lifecycle = MaybeSubject.create();
  source.as(autoDisposable(lifecycle))
      .subscribe();

  assertThat(i.get()).isEqualTo(0);
  assertThat(lifecycle.hasObservers()).isTrue();

  lifecycle.onSuccess(0);

  // Verify cancellation was called
  assertThat(i.get()).isEqualTo(1);
  assertThat(lifecycle.hasObservers()).isFalse();
}
AutoDisposeSingleObserverTest.java 文件源码 项目:AutoDispose 阅读 33 收藏 0 点赞 0 评论 0
@Test public void verifyCancellation() {
  final AtomicInteger i = new AtomicInteger();
  //noinspection unchecked because Java
  Single<Integer> source = Single.create(new SingleOnSubscribe<Integer>() {
    @Override public void subscribe(SingleEmitter<Integer> e) {
      e.setCancellable(new Cancellable() {
        @Override public void cancel() {
          i.incrementAndGet();
        }
      });
    }
  });
  MaybeSubject<Integer> lifecycle = MaybeSubject.create();
  source.as(AutoDispose.<Integer>autoDisposable(lifecycle))
      .subscribe();

  assertThat(i.get()).isEqualTo(0);
  assertThat(lifecycle.hasObservers()).isTrue();

  lifecycle.onSuccess(0);

  // Verify cancellation was called
  assertThat(i.get()).isEqualTo(1);
  assertThat(lifecycle.hasObservers()).isFalse();
}
AppStateObservableOnSubscribe.java 文件源码 项目:RxAppState 阅读 30 收藏 0 点赞 0 评论 0
@Override
public void subscribe(@NonNull final ObservableEmitter<AppState> appStateEmitter) throws Exception {
  final AppStateListener appStateListener = new AppStateListener() {
    @Override
    public void onAppDidEnterForeground() {
      appStateEmitter.onNext(FOREGROUND);
    }

    @Override
    public void onAppDidEnterBackground() {
      appStateEmitter.onNext(BACKGROUND);
    }
  };

  appStateEmitter.setCancellable(new Cancellable() {
    @Override public void cancel() throws Exception {
      recognizer.removeListener(appStateListener);
      recognizer.stop();
    }
  });

  recognizer.addListener(appStateListener);
  recognizer.start();
}
RxValue.java 文件源码 项目:rxfirebase 阅读 38 收藏 0 点赞 0 评论 0
/**
 * @param query
 * @return
 */
@NonNull
@CheckReturnValue
public static Single<DataSnapshot> single(@NonNull final Query query) {
    return Single.create(new SingleOnSubscribe<DataSnapshot>() {
        @Override
        public void subscribe(
                @NonNull final SingleEmitter<DataSnapshot> emit) throws Exception {
            final ValueEventListener listener = listener(emit);

            emit.setCancellable(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    query.removeEventListener(listener);
                }
            });

            query.addListenerForSingleValueEvent(listener);
        }
    });
}
FingerprintObservable.java 文件源码 项目:RxFingerprint 阅读 27 收藏 0 点赞 0 评论 0
@Override
@RequiresPermission(USE_FINGERPRINT)
@RequiresApi(Build.VERSION_CODES.M)
public void subscribe(ObservableEmitter<T> emitter) throws Exception {
    if (fingerprintApiWrapper.isUnavailable()) {
        emitter.onError(new FingerprintUnavailableException("Fingerprint authentication is not available on this device! Ensure that the device has a Fingerprint sensor and enrolled Fingerprints by calling RxFingerprint#isAvailable(Context) first"));
        return;
    }

    AuthenticationCallback callback = createAuthenticationCallback(emitter);
    cancellationSignal = fingerprintApiWrapper.createCancellationSignal();
    CryptoObject cryptoObject = initCryptoObject(emitter);
    //noinspection MissingPermission
    fingerprintApiWrapper.getFingerprintManager().authenticate(cryptoObject, cancellationSignal, 0, callback, null);

    emitter.setCancellable(new Cancellable() {
        @Override
        public void cancel() throws Exception {
            if (cancellationSignal != null && !cancellationSignal.isCanceled()) {
                cancellationSignal.cancel();
            }
        }
    });
}
SceneLifecycleListener.java 文件源码 项目:Stage 阅读 29 收藏 0 点赞 0 评论 0
public void addEmitter(@NonNull final ObservableEmitter<Integer> emitter) {
  emitterList.add(emitter);
  emitter.setCancellable(new Cancellable() {
    @Override
    public void cancel() throws Exception {
      emitterList.remove(emitter);
    }
  });
  emitMissingLifecycle(emitter);
}
RxFirebaseDatabase.java 文件源码 项目:showcase-android 阅读 33 收藏 0 点赞 0 评论 0
/**
 * Listener for changes in te data at the given query location.
 *
 * @param query    reference represents a particular location in your Database and can be used for reading or writing data to that Database location.
 * @param strategy {@link BackpressureStrategy} associated to this {@link Flowable}
 * @return a {@link Flowable} which emits when a value of the database change in the given query.
 */
@NonNull
public static Flowable<DataSnapshot> observeValueEvent(@NonNull final Query query,
                                                       @NonNull BackpressureStrategy strategy) {
   return Flowable.create(new FlowableOnSubscribe<DataSnapshot>() {
      @Override
      public void subscribe(final FlowableEmitter<DataSnapshot> emitter) throws Exception {
         final ValueEventListener valueEventListener = new ValueEventListener() {
            @Override
            public void onDataChange(DataSnapshot dataSnapshot) {
               emitter.onNext(dataSnapshot);
            }

            @Override
            public void onCancelled(final DatabaseError error) {
               emitter.onError(new RxFirebaseDataException(error));
            }
         };
         emitter.setCancellable(new Cancellable() {
            @Override
            public void cancel() throws Exception {
               query.removeEventListener(valueEventListener);
            }
         });
         query.addValueEventListener(valueEventListener);
      }
   }, strategy);
}
IMUSensorGatherer.java 文件源码 项目:AndroidSensors 阅读 28 收藏 0 点赞 0 评论 0
private void addUnsubscribeCallbackFor(FlowableEmitter<SensorRecord> subscriber,
                                       final SensorEventListener sensorEventListener) {
    subscriber.setCancellable(new Cancellable() {
        @Override
        public void cancel() throws Exception {
            sensorManager.unregisterListener(sensorEventListener);
        }
    });
}
WifiMeasurementsGatherer.java 文件源码 项目:AndroidSensors 阅读 35 收藏 0 点赞 0 评论 0
private void addUnsubscribeCallbackFor(FlowableEmitter<SensorRecord> subscriber,
                                       final BroadcastReceiver broadcastReceiver){
    subscriber.setCancellable(new Cancellable() {
        @Override
        public void cancel() throws Exception {
            context.unregisterReceiver(broadcastReceiver);
        }
    });
}
BLEMeasurementsGatherer.java 文件源码 项目:AndroidSensors 阅读 28 收藏 0 点赞 0 评论 0
@RequiresApi(api = Build.VERSION_CODES.LOLLIPOP)
private void addUnsuscribeCallbackFor(FlowableEmitter<SensorRecord> subscriber,
                                      final ScanCallback scanCallback){
    final BluetoothLeScanner scanner = bluetoothManager.getAdapter().getBluetoothLeScanner();
    subscriber.setCancellable(new Cancellable() {
        @Override
        public void cancel() throws Exception {
            scanner.flushPendingScanResults(scanCallback);
            scanner.stopScan(scanCallback);
        }
    });
}
RawGPSMeasurementsGatherer.java 文件源码 项目:AndroidSensors 阅读 28 收藏 0 点赞 0 评论 0
@RequiresApi(Build.VERSION_CODES.N)
private void addUnsubscribeCallbackFor(FlowableEmitter<SensorRecord> subscriber,
                                       final GnssMeasurementsEvent.Callback callback) {
    subscriber.setCancellable(new Cancellable() {
        @Override
        public void cancel() throws Exception {
            locationManager.unregisterGnssMeasurementsCallback(callback);
        }
    });
}
RawGPSStatusGatherer.java 文件源码 项目:AndroidSensors 阅读 27 收藏 0 点赞 0 评论 0
@RequiresApi(Build.VERSION_CODES.N)
private void addUnsubscribeCallbackFor(FlowableEmitter<SensorRecord> subscriber,
                                       final GnssStatus.Callback callback) {
    subscriber.setCancellable(new Cancellable() {
        @Override
        public void cancel() throws Exception {
            locationManager.unregisterGnssStatusCallback(callback);
        }
    });
}
LocationGatherer.java 文件源码 项目:AndroidSensors 阅读 23 收藏 0 点赞 0 评论 0
private void addUnsubscribeCallbackFor(FlowableEmitter<SensorRecord> subscriber,
                                       final LocationListener locationListener) {
    subscriber.setCancellable(new Cancellable() {
        @Override
        public void cancel() throws Exception {
            locationManager.removeUpdates(locationListener);
        }
    });
}
RawGPSNavigationGatherer.java 文件源码 项目:AndroidSensors 阅读 36 收藏 0 点赞 0 评论 0
@RequiresApi(Build.VERSION_CODES.N)
private void addUnsubscribeCallbackFor(FlowableEmitter<SensorRecord> subscriber,
                                       final GnssNavigationMessage.Callback callback) {
    subscriber.setCancellable(new Cancellable() {
        @Override
        public void cancel() throws Exception {
            locationManager.unregisterGnssNavigationMessageCallback(callback);
        }
    });
}
AbsServerAsActivity.java 文件源码 项目:Attendance 阅读 22 收藏 0 点赞 0 评论 0
private Observable<String> createButtonClickObservable() {

    // 2
    return Observable.create(new ObservableOnSubscribe<String>() {

      // 3
      @Override
      public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
        // 4
            onclicklist = new View.OnClickListener() {
              @Override
              public void onClick(View view) {
                  // 5
                  emitter.onNext(mQueryEditText.getText().toString());
              }
            };
            mSearchButton.setOnClickListener(onclicklist);

        // 6
        emitter.setCancellable(new Cancellable() {
          @Override
          public void cancel() throws Exception {
            // 7
              onclicklist = null;
              mSearchButton.setOnClickListener(null);
          }
        });
      }
    });
  }
CheeseActivity.java 文件源码 项目:Attendance 阅读 29 收藏 0 点赞 0 评论 0
private Observable<String> createButtonClickObservable() {

    // 2
    return Observable.create(new ObservableOnSubscribe<String>() {

      // 3
      @Override
      public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
        // 4
        mSearchButton.setOnClickListener(new View.OnClickListener() {
          @Override
          public void onClick(View view) {
            // 5
            emitter.onNext(mQueryEditText.getText().toString());
          }
        });

        // 6
        emitter.setCancellable(new Cancellable() {
          @Override
          public void cancel() throws Exception {
            // 7
            mSearchButton.setOnClickListener(null);
          }
        });
      }
    });
  }
AbsServerActivity.java 文件源码 项目:Attendance 阅读 25 收藏 0 点赞 0 评论 0
private Observable<String> createButtonClickObservable() {

    // 2
    return Observable.create(new ObservableOnSubscribe<String>() {

      // 3
      @Override
      public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
        // 4
        mSearchButton.setOnClickListener(new View.OnClickListener() {
          @Override
          public void onClick(View view) {
            // 5
            emitter.onNext(mQueryEditText.getText().toString());
          }
        });

        // 6
        emitter.setCancellable(new Cancellable() {
          @Override
          public void cancel() throws Exception {
            // 7
            mSearchButton.setOnClickListener(null);
          }
        });
      }
    });
  }
CompanyChooseActivity.java 文件源码 项目:Attendance 阅读 28 收藏 0 点赞 0 评论 0
private Observable<String> createButtonClickObservable() {

    // 2
    return Observable.create(new ObservableOnSubscribe<String>() {

      // 3
      @Override
      public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
        // 4
            onclicklist = new View.OnClickListener() {
              @Override
              public void onClick(View view) {
                  // 5
                  emitter.onNext(mQueryEditText.getText().toString());
              }
            };
            mSearchButton.setOnClickListener(onclicklist);

        // 6
        emitter.setCancellable(new Cancellable() {
          @Override
          public void cancel() throws Exception {
            // 7
              onclicklist = null;
              mSearchButton.setOnClickListener(null);
          }
        });
      }
    });
  }
AutoDisposeObserverTest.java 文件源码 项目:AutoDispose 阅读 26 收藏 0 点赞 0 评论 0
@Test public void verifyCancellation() {
  final AtomicInteger i = new AtomicInteger();
  //noinspection unchecked because Java
  final ObservableEmitter<Integer>[] emitter = new ObservableEmitter[1];
  Observable<Integer> source = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override public void subscribe(ObservableEmitter<Integer> e) {
      e.setCancellable(new Cancellable() {
        @Override public void cancel() {
          i.incrementAndGet();
        }
      });
      emitter[0] = e;
    }
  });
  MaybeSubject<Integer> lifecycle = MaybeSubject.create();
  source.as(AutoDispose.<Integer>autoDisposable(lifecycle))
      .subscribe();

  assertThat(i.get()).isEqualTo(0);
  assertThat(lifecycle.hasObservers()).isTrue();

  emitter[0].onNext(1);

  lifecycle.onSuccess(0);
  emitter[0].onNext(2);

  // Verify cancellation was called
  assertThat(i.get()).isEqualTo(1);
  assertThat(lifecycle.hasObservers()).isFalse();
}


问题


面经


文章

微信
公众号

扫码关注公众号