@Override
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public <T> Observable<T> queue(final Operation<T> operation) {
return Observable.create(new Action1<Emitter<T>>() {
@Override
public void call(Emitter<T> tEmitter) {
final FIFORunnableEntry entry = new FIFORunnableEntry<>(operation, tEmitter);
tEmitter.setCancellation(new Cancellable() {
@Override
public void cancel() throws Exception {
if (queue.remove(entry)) {
logOperationRemoved(operation);
}
}
});
logOperationQueued(operation);
queue.add(entry);
}
}, Emitter.BackpressureMode.NONE);
}
java类rx.functions.Cancellable的实例源码
ClientOperationQueueImpl.java 文件源码
项目:RxAndroidBle
阅读 28
收藏 0
点赞 0
评论 0
ConnectionOperationQueueImpl.java 文件源码
项目:RxAndroidBle
阅读 25
收藏 0
点赞 0
评论 0
@Override
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public synchronized <T> Observable<T> queue(final Operation<T> operation) {
if (!shouldRun) {
return Observable.error(disconnectionException);
}
return Observable.create(new Action1<Emitter<T>>() {
@Override
public void call(Emitter<T> tEmitter) {
final FIFORunnableEntry entry = new FIFORunnableEntry<>(operation, tEmitter);
tEmitter.setCancellation(new Cancellable() {
@Override
public void cancel() throws Exception {
if (queue.remove(entry)) {
logOperationRemoved(operation);
}
}
});
logOperationQueued(operation);
queue.add(entry);
}
}, Emitter.BackpressureMode.NONE);
}
ScanOperation.java 文件源码
项目:RxAndroidBle
阅读 20
收藏 0
点赞 0
评论 0
@Override
final protected void protectedRun(final Emitter<SCAN_RESULT_TYPE> emitter, QueueReleaseInterface queueReleaseInterface) {
final SCAN_CALLBACK_TYPE scanCallback = createScanCallback(emitter);
try {
emitter.setCancellation(new Cancellable() {
@Override
public void cancel() throws Exception {
RxBleLog.i("Scan operation is requested to stop.");
stopScan(rxBleAdapterWrapper, scanCallback);
}
});
RxBleLog.i("Scan operation is requested to start.");
boolean startLeScanStatus = startScan(rxBleAdapterWrapper, scanCallback);
if (!startLeScanStatus) {
emitter.onError(new BleScanException(BleScanException.BLUETOOTH_CANNOT_START));
}
} catch (Throwable throwable) {
RxBleLog.e(throwable, "Error while calling the start scan function");
emitter.onError(new BleScanException(BleScanException.BLUETOOTH_CANNOT_START));
} finally {
queueReleaseInterface.release();
}
}
RxAhoy.java 文件源码
项目:ahoy-android
阅读 20
收藏 0
点赞 0
评论 0
public static Observable<Visit> visitStream(final Ahoy ahoy) {
return Observable.create(new Action1<Emitter<Visit>>() {
@Override public void call(final Emitter<Visit> emitter) {
final VisitListener listener = new VisitListener() {
@Override public void onVisitUpdated(Visit visit) {
emitter.onNext(visit);
}
};
ahoy.addVisitListener(listener);
emitter.setCancellation(new Cancellable() {
@Override public void cancel() throws Exception {
ahoy.removeVisitListener(listener);
}
});
}
}, BackpressureMode.LATEST);
}
AppStateEmitter.java 文件源码
项目:RxAppState
阅读 23
收藏 0
点赞 0
评论 0
@Override
public void call(final Emitter<AppState> appStateEmitter) {
final AppStateListener appStateListener = new AppStateListener() {
@Override
public void onAppDidEnterForeground() {
appStateEmitter.onNext(FOREGROUND);
}
@Override
public void onAppDidEnterBackground() {
appStateEmitter.onNext(BACKGROUND);
}
};
appStateEmitter.setCancellation(new Cancellable() {
@Override
public void cancel() throws Exception {
recognizer.removeListener(appStateListener);
recognizer.stop();
}
});
recognizer.addListener(appStateListener);
recognizer.start();
}
Value.java 文件源码
项目:Raclette
阅读 26
收藏 0
点赞 0
评论 0
public static <T> Observable<T> toObservable(final ObservableField<T> observableField, final boolean emitCurrent) {
return Observable.create(new Action1<Emitter<T>>() {
@Override
public void call(final Emitter<T> emitter) {
if (emitCurrent && observableField.get() != null) {
emitter.onNext(observableField.get());
}
final OnPropertyChangedCallback callback = new OnPropertyChangedCallback() {
@Override
public void onPropertyChanged(android.databinding.Observable dataBindingObservable, int propertyId) {
if (dataBindingObservable == observableField) {
emitter.onNext(observableField.get());
}
}
};
observableField.addOnPropertyChangedCallback(callback);
emitter.setCancellation(new Cancellable() {
@Override
public void cancel() throws Exception {
observableField.removeOnPropertyChangedCallback(callback);
}
});
}
}, Emitter.BackpressureMode.BUFFER);
}
RxApollo.java 文件源码
项目:apollo-android
阅读 18
收藏 0
点赞 0
评论 0
/**
* Converts an {@link ApolloQueryWatcher} into an Observable.
*
* @param watcher the ApolloQueryWatcher to convert
* @param backpressureMode the back pressure strategy to apply to the observable source.
* @param <T> the value type
* @return the converted Observable
*/
@Nonnull public static <T> Observable<Response<T>> from(@Nonnull final ApolloQueryWatcher<T> watcher,
@Nonnull Emitter.BackpressureMode backpressureMode) {
checkNotNull(backpressureMode, "backpressureMode == null");
checkNotNull(watcher, "watcher == null");
return Observable.create(new Action1<Emitter<Response<T>>>() {
@Override public void call(final Emitter<Response<T>> emitter) {
final AtomicBoolean canceled = new AtomicBoolean();
emitter.setCancellation(new Cancellable() {
@Override public void cancel() throws Exception {
canceled.set(true);
watcher.cancel();
}
});
watcher.enqueueAndWatch(new ApolloCall.Callback<T>() {
@Override public void onResponse(@Nonnull Response<T> response) {
if (!canceled.get()) {
emitter.onNext(response);
}
}
@Override public void onFailure(@Nonnull ApolloException e) {
Exceptions.throwIfFatal(e);
if (!canceled.get()) {
emitter.onError(e);
}
}
});
}
}, backpressureMode);
}
RxApollo.java 文件源码
项目:apollo-android
阅读 20
收藏 0
点赞 0
评论 0
/**
* Converts an {@link ApolloCall} to a Observable. The number of emissions this Observable will have is based on the
* {@link ResponseFetcher} used with the call.
*
* @param call the ApolloCall to convert
* @param <T> the value type
* @param backpressureMode The {@link rx.Emitter.BackpressureMode} to use.
* @return the converted Observable
*/
@Nonnull public static <T> Observable<Response<T>> from(@Nonnull final ApolloCall<T> call,
Emitter.BackpressureMode backpressureMode) {
checkNotNull(call, "call == null");
return Observable.create(new Action1<Emitter<Response<T>>>() {
@Override public void call(final Emitter<Response<T>> emitter) {
final AtomicBoolean canceled = new AtomicBoolean();
emitter.setCancellation(new Cancellable() {
@Override public void cancel() throws Exception {
canceled.set(true);
call.cancel();
}
});
call.enqueue(new ApolloCall.Callback<T>() {
@Override public void onResponse(@Nonnull Response<T> response) {
if (!canceled.get()) {
emitter.onNext(response);
}
}
@Override public void onFailure(@Nonnull ApolloException e) {
Exceptions.throwIfFatal(e);
if (!canceled.get()) {
emitter.onError(e);
}
}
@Override public void onStatusEvent(@Nonnull ApolloCall.StatusEvent event) {
if (!canceled.get()) {
if (event == ApolloCall.StatusEvent.COMPLETED) {
emitter.onCompleted();
}
}
}
});
}
}, backpressureMode);
}
RxApollo.java 文件源码
项目:apollo-android
阅读 21
收藏 0
点赞 0
评论 0
@Nonnull public static <T> Observable<Response<T>> from(@Nonnull final ApolloSubscriptionCall<T> call,
Emitter.BackpressureMode backpressureMode) {
checkNotNull(call, "call == null");
return Observable.create(new Action1<Emitter<Response<T>>>() {
@Override public void call(final Emitter<Response<T>> emitter) {
final AtomicBoolean canceled = new AtomicBoolean();
emitter.setCancellation(new Cancellable() {
@Override public void cancel() throws Exception {
canceled.set(true);
call.cancel();
}
});
call.execute(new ApolloSubscriptionCall.Callback<T>() {
@Override public void onResponse(@Nonnull Response<T> response) {
if (!canceled.get()) {
emitter.onNext(response);
}
}
@Override public void onFailure(@Nonnull ApolloException e) {
Exceptions.throwIfFatal(e);
if (!canceled.get()) {
emitter.onError(e);
}
}
@Override public void onCompleted() {
if (!canceled.get()) {
emitter.onCompleted();
}
}
});
}
}, backpressureMode);
}
MerlinAction.java 文件源码
项目:merlin
阅读 21
收藏 0
点赞 0
评论 0
private Cancellable createCancellable() {
return new Cancellable() {
@Override
public void cancel() throws Exception {
merlin.unbind();
}
};
}
ConnectOperation.java 文件源码
项目:RxAndroidBle
阅读 20
收藏 0
点赞 0
评论 0
/**
* Emits BluetoothGatt and completes after connection is established.
*
* @return BluetoothGatt after connection reaches {@link com.polidea.rxandroidble.RxBleConnection.RxBleConnectionState#CONNECTED}
* state.
* @throws com.polidea.rxandroidble.exceptions.BleDisconnectedException if connection was disconnected/failed before it was established.
*/
@NonNull
private Observable<BluetoothGatt> getConnectedBluetoothGatt() {
// start connecting the BluetoothGatt
// note: Due to different Android BLE stack implementations it is not certain whether `connectGatt()` or `BluetoothGattCallback`
// will emit BluetoothGatt first
return Observable.create(
new Action1<Emitter<BluetoothGatt>>() {
@Override
public void call(Emitter<BluetoothGatt> emitter) {
final Subscription connectedBluetoothGattSubscription = Observable.fromCallable(new Func0<BluetoothGatt>() {
@Override
public BluetoothGatt call() {
connectionStateChangedAction.onConnectionStateChange(CONNECTED);
return bluetoothGattProvider.getBluetoothGatt();
}
})
// when the connected state will be emitted bluetoothGattProvider should contain valid Gatt
.delaySubscription(
rxBleGattCallback
.getOnConnectionStateChange()
.takeFirst(new Func1<RxBleConnection.RxBleConnectionState, Boolean>() {
@Override
public Boolean call(RxBleConnection.RxBleConnectionState rxBleConnectionState) {
return rxBleConnectionState == CONNECTED;
}
})
)
// disconnect may happen even if the connection was not established yet
.mergeWith(rxBleGattCallback.<BluetoothGatt>observeDisconnect())
.take(1)
.subscribe(emitter);
emitter.setCancellation(new Cancellable() {
@Override
public void cancel() throws Exception {
connectedBluetoothGattSubscription.unsubscribe();
}
});
connectionStateChangedAction.onConnectionStateChange(CONNECTING);
/*
* Apparently the connection may be established fast enough to introduce a race condition so the subscription
* must be established first before starting the connection.
* https://github.com/Polidea/RxAndroidBle/issues/178
* */
final BluetoothGatt bluetoothGatt = connectionCompat
.connectGatt(bluetoothDevice, autoConnect, rxBleGattCallback.getBluetoothGattCallback());
/*
* Update BluetoothGatt when connection is initiated. It is not certain
* if this or RxBleGattCallback.onConnectionStateChange will be first.
* */
bluetoothGattProvider.updateBluetoothGatt(bluetoothGatt);
}
},
Emitter.BackpressureMode.NONE
);
}
ClientStateObservable.java 文件源码
项目:RxAndroidBle
阅读 21
收藏 0
点赞 0
评论 0
@Inject
protected ClientStateObservable(
final RxBleAdapterWrapper rxBleAdapterWrapper,
final Observable<RxBleAdapterStateObservable.BleAdapterState> bleAdapterStateObservable,
@Named(ClientComponent.NamedBooleanObservables.LOCATION_SERVICES_OK) final Observable<Boolean> locationServicesOkObservable,
final LocationServicesStatus locationServicesStatus,
@Named(ClientComponent.NamedSchedulers.TIMEOUT) final Scheduler timerScheduler
) {
super(new OnSubscribeCreate<>(
new Action1<Emitter<RxBleClient.State>>() {
@Override
public void call(Emitter<RxBleClient.State> emitter) {
if (!rxBleAdapterWrapper.hasBluetoothAdapter()) {
emitter.onCompleted();
return;
}
final Subscription changingStateSubscription = checkPermissionUntilGranted(locationServicesStatus, timerScheduler)
.flatMapObservable(new Func1<Boolean, Observable<RxBleClient.State>>() {
@Override
public Observable<RxBleClient.State> call(Boolean permissionWasInitiallyGranted) {
return checkAdapterAndServicesState(
permissionWasInitiallyGranted,
rxBleAdapterWrapper,
bleAdapterStateObservable,
locationServicesOkObservable
);
}
})
.distinctUntilChanged()
.subscribe(emitter);
emitter.setCancellation(new Cancellable() {
@Override
public void cancel() throws Exception {
changingStateSubscription.unsubscribe();
}
});
}
},
Emitter.BackpressureMode.LATEST
));
}
AppStateEmitterTest.java 文件源码
项目:RxAppState
阅读 17
收藏 0
点赞 0
评论 0
@Test
public void setsCancellation() {
verify(mockEmitter).setCancellation(any(Cancellable.class));
}
LocalStorageRepoImpl.java 文件源码
项目:android-common
阅读 18
收藏 0
点赞 0
评论 0
@Override
public Observable<T> get(String id) {
LoggerHelper.logDebug("local:" + this.getClass().toString() + " get:" + id);
return database
.get()
.createQuery(getTableName(), "SELECT * FROM " + getTableName() + " WHERE id = ?", id)
.take(1)
.switchMap(
new Func1<SqlBrite.Query, Observable<? extends T>>() {
@Override
public Observable<? extends T> call(final SqlBrite.Query map) {
return Observable.create(
new Action1<Emitter<T>>() {
@Override
public void call(Emitter<T> emitter) {
final Cursor cursor = map.run();
emitter.setCancellation(
new Cancellable() {
@Override
public void cancel() throws Exception {
cursor.close();
}
});
if (!cursor.isClosed() && cursor.moveToFirst()) {
try {
emitter.onNext(
JsonMapper.INSTANCE.fromJson(
cursor.getString(DATA_COLUMN),
LocalStorageRepoImpl.this.getType()));
} catch (IOException e) {
emitter.onError(e);
return;
}
}
emitter.onCompleted();
}
},
Emitter.BackpressureMode.LATEST);
}
});
}
LocalStorageRepoImpl.java 文件源码
项目:android-common
阅读 29
收藏 0
点赞 0
评论 0
@Override
public Observable<T> getAll() {
LoggerHelper.logDebug("local:" + this.getClass().toString() + " getAll");
return database
.get()
.createQuery(getTableName(), "SELECT * FROM " + getTableName())
.take(1)
.switchMap(
new Func1<SqlBrite.Query, Observable<? extends T>>() {
@Override
public Observable<? extends T> call(final SqlBrite.Query map) {
return Observable.create(
new Action1<Emitter<T>>() {
@Override
public void call(Emitter<T> emitter) {
final Cursor cursor = map.run();
emitter.setCancellation(
new Cancellable() {
@Override
public void cancel() throws Exception {
cursor.close();
}
});
while (!cursor.isClosed() && cursor.moveToNext()) {
try {
emitter.onNext(
JsonMapper.INSTANCE.fromJson(
cursor.getString(DATA_COLUMN),
LocalStorageRepoImpl.this.getType()));
} catch (IOException e) {
emitter.onError(e);
return;
}
}
emitter.onCompleted();
}
},
Emitter.BackpressureMode.BUFFER);
}
});
}