@Override
@SuppressWarnings("unchecked")
public void call(final Subscriber<? super T> subscriber) {
final PropertyChangeListener changeListener = new PropertyChangeListener() {
@Override
public void propertyChange(PropertyChangeEvent event) {
if (!subscriber.isUnsubscribed() && property.equals(event.getPropertyName())) {
subscriber.onNext((T) event.getNewValue());
}
}
};
propertyChangeSupport.addPropertyChangeListener(changeListener);
subscriber.add(BooleanSubscription.create(new Action0() {
@Override
public void call() {
propertyChangeSupport.removePropertyChangeListener(changeListener);
}
}));
}
java类rx.subscriptions.BooleanSubscription的实例源码
PropertyChangeListenerOnSubscribe.java 文件源码
项目:rx-mvvm-android
阅读 23
收藏 0
点赞 0
评论 0
RxManager.java 文件源码
项目:nextop-client
阅读 21
收藏 0
点赞 0
评论 0
private Observable<ManagedState> getState(final Id id) {
return Observable.create(new Observable.OnSubscribe<ManagedState>() {
@Override
public void call(final Subscriber<? super ManagedState> subscriber) {
final ManagedState state = state(id, true);
subscriber.add(BooleanSubscription.create(new Action0() {
@Override
public void call() {
state.removeSubscriber(subscriber);
}
}));
int publishCount = state.publishCount;
state.addSubscriber(subscriber);
// check to avoid double-publishing
if (publishCount == state.publishCount) {
subscriber.onNext(state);
}
}
});
}
ListenToValueEventsOnSubscribe.java 文件源码
项目:Firebase-Chat-Demo
阅读 20
收藏 0
点赞 0
评论 0
@Override
public void call(Subscriber<? super T> subscriber) {
final ValueEventListener eventListener = query.addValueEventListener(new RxValueListener<>(subscriber, marshaller));
subscriber.add(BooleanSubscription.create(new Action0() {
@Override
public void call() {
query.removeEventListener(eventListener);
}
}));
}
ListenToValueEventsOnSubscribe.java 文件源码
项目:TripV1.0
阅读 30
收藏 0
点赞 0
评论 0
@Override
public void call(Subscriber<? super T> subscriber) {
final ValueEventListener eventListener = query.addValueEventListener(new RxValueListener<>(subscriber, marshaller));
subscriber.add(BooleanSubscription.create(new Action0() {
@Override
public void call() {
query.removeEventListener(eventListener);
}
}));
}
RxMock.java 文件源码
项目:rxpresso
阅读 20
收藏 0
点赞 0
评论 0
private AddUnsubscribe clearOnUnsubscribe(final Object observable) {
return new AddUnsubscribe(
BooleanSubscription.create(
new Action0() {
@Override
public void call() {
mapSubject.get(observable).first.clear();
}
}
)
);
}
RxLifecycleBinder.java 文件源码
项目:nextop-client
阅读 21
收藏 0
点赞 0
评论 0
public Subscription outSubscription() {
return BooleanSubscription.create(new Action0() {
@Override
public void call() {
unsubscribe();
subscribers = subscribers.removing(Bridge.this);
}
});
}
ImmediateScheduler.java 文件源码
项目:boohee_v5.6
阅读 20
收藏 0
点赞 0
评论 0
private InnerImmediateScheduler() {
this.innerSubscription = new BooleanSubscription();
}
TrampolineScheduler.java 文件源码
项目:boohee_v5.6
阅读 25
收藏 0
点赞 0
评论 0
private InnerCurrentThreadScheduler() {
this.counter = new AtomicInteger();
this.queue = new PriorityBlockingQueue();
this.innerSubscription = new BooleanSubscription();
this.wip = new AtomicInteger();
}
TestScheduler.java 文件源码
项目:boohee_v5.6
阅读 29
收藏 0
点赞 0
评论 0
private InnerTestScheduler() {
this.s = new BooleanSubscription();
}
ObservableTests.java 文件源码
项目:RxJavaFlow
阅读 25
收藏 0
点赞 0
评论 0
/**
* The error from the user provided Observer is not handled by the subscribe method try/catch.
*
* It is handled by the AtomicObserver that wraps the provided Observer.
*
* Result: Passes (if AtomicObserver functionality exists)
*/
@Test
public void testCustomObservableWithErrorInObserverAsynchronous() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger count = new AtomicInteger();
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
Observable.create(new OnSubscribe<String>() {
@Override
public void call(final Subscriber<? super String> observer) {
final BooleanSubscription s = new BooleanSubscription();
new Thread(new Runnable() {
@Override
public void run() {
try {
if (!s.isUnsubscribed()) {
observer.onNext("1");
observer.onNext("2");
observer.onNext("three");
observer.onNext("4");
observer.onComplete();
}
} finally {
latch.countDown();
}
}
}).start();
}
}).subscribe(new Subscriber<String>() {
@Override
public void onComplete() {
System.out.println("completed");
}
@Override
public void onError(Throwable e) {
error.set(e);
System.out.println("error");
e.printStackTrace();
}
@Override
public void onNext(String v) {
int num = Integer.parseInt(v);
System.out.println(num);
// doSomething(num);
count.incrementAndGet();
}
});
// wait for async sequence to complete
latch.await();
assertEquals(2, count.get());
assertNotNull(error.get());
if (!(error.get() instanceof NumberFormatException)) {
fail("It should be a NumberFormatException");
}
}
ObservableTests.java 文件源码
项目:lakeside-java
阅读 24
收藏 0
点赞 0
评论 0
/**
* The error from the user provided Observer is not handled by the subscribe method try/catch.
*
* It is handled by the AtomicObserver that wraps the provided Observer.
*
* Result: Passes (if AtomicObserver functionality exists)
*/
@Test
public void testCustomObservableWithErrorInObserverAsynchronous() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger count = new AtomicInteger();
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(final Observer<? super String> observer) {
final BooleanSubscription s = new BooleanSubscription();
new Thread(new Runnable() {
@Override
public void run() {
try {
if (!s.isUnsubscribed()) {
observer.onNext("1");
observer.onNext("2");
observer.onNext("three");
observer.onNext("4");
observer.onCompleted();
}
} finally {
latch.countDown();
}
}
}).start();
return s;
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("completed");
}
@Override
public void onError(Throwable e) {
error.set(e);
System.out.println("error");
e.printStackTrace();
}
@Override
public void onNext(String v) {
int num = Integer.parseInt(v);
System.out.println(num);
// doSomething(num);
count.incrementAndGet();
}
});
// wait for async sequence to complete
latch.await();
assertEquals(2, count.get());
assertNotNull(error.get());
if (!(error.get() instanceof NumberFormatException)) {
fail("It should be a NumberFormatException");
}
}