java类rx.subscriptions.BooleanSubscription的实例源码

PropertyChangeListenerOnSubscribe.java 文件源码 项目:rx-mvvm-android 阅读 23 收藏 0 点赞 0 评论 0
@Override
@SuppressWarnings("unchecked")
public void call(final Subscriber<? super T> subscriber) {
    final PropertyChangeListener changeListener = new PropertyChangeListener() {
        @Override
        public void propertyChange(PropertyChangeEvent event) {
            if (!subscriber.isUnsubscribed() && property.equals(event.getPropertyName())) {
                subscriber.onNext((T) event.getNewValue());
            }
        }
    };

    propertyChangeSupport.addPropertyChangeListener(changeListener);
    subscriber.add(BooleanSubscription.create(new Action0() {
        @Override
        public void call() {
            propertyChangeSupport.removePropertyChangeListener(changeListener);
        }
    }));
}
RxManager.java 文件源码 项目:nextop-client 阅读 21 收藏 0 点赞 0 评论 0
private Observable<ManagedState> getState(final Id id) {
    return Observable.create(new Observable.OnSubscribe<ManagedState>() {
        @Override
        public void call(final Subscriber<? super ManagedState> subscriber) {
            final ManagedState state = state(id, true);

            subscriber.add(BooleanSubscription.create(new Action0() {
                @Override
                public void call() {
                    state.removeSubscriber(subscriber);
                }
            }));

            int publishCount = state.publishCount;
            state.addSubscriber(subscriber);
            // check to avoid double-publishing
            if (publishCount == state.publishCount) {
                subscriber.onNext(state);
            }
        }
    });
}
ListenToValueEventsOnSubscribe.java 文件源码 项目:Firebase-Chat-Demo 阅读 20 收藏 0 点赞 0 评论 0
@Override
public void call(Subscriber<? super T> subscriber) {
    final ValueEventListener eventListener = query.addValueEventListener(new RxValueListener<>(subscriber, marshaller));
    subscriber.add(BooleanSubscription.create(new Action0() {
        @Override
        public void call() {
            query.removeEventListener(eventListener);
        }
    }));
}
ListenToValueEventsOnSubscribe.java 文件源码 项目:TripV1.0 阅读 30 收藏 0 点赞 0 评论 0
@Override
public void call(Subscriber<? super T> subscriber) {
    final ValueEventListener eventListener = query.addValueEventListener(new RxValueListener<>(subscriber, marshaller));
    subscriber.add(BooleanSubscription.create(new Action0() {
        @Override
        public void call() {
            query.removeEventListener(eventListener);
        }
    }));
}
RxMock.java 文件源码 项目:rxpresso 阅读 20 收藏 0 点赞 0 评论 0
private AddUnsubscribe clearOnUnsubscribe(final Object observable) {
    return new AddUnsubscribe(
            BooleanSubscription.create(
                    new Action0() {
                        @Override
                        public void call() {
                            mapSubject.get(observable).first.clear();
                        }
                    }
            )
    );
}
RxLifecycleBinder.java 文件源码 项目:nextop-client 阅读 21 收藏 0 点赞 0 评论 0
public Subscription outSubscription() {
    return BooleanSubscription.create(new Action0() {
        @Override
        public void call() {
            unsubscribe();
            subscribers = subscribers.removing(Bridge.this);
        }
    });
}
ImmediateScheduler.java 文件源码 项目:boohee_v5.6 阅读 20 收藏 0 点赞 0 评论 0
private InnerImmediateScheduler() {
    this.innerSubscription = new BooleanSubscription();
}
TrampolineScheduler.java 文件源码 项目:boohee_v5.6 阅读 25 收藏 0 点赞 0 评论 0
private InnerCurrentThreadScheduler() {
    this.counter = new AtomicInteger();
    this.queue = new PriorityBlockingQueue();
    this.innerSubscription = new BooleanSubscription();
    this.wip = new AtomicInteger();
}
TestScheduler.java 文件源码 项目:boohee_v5.6 阅读 29 收藏 0 点赞 0 评论 0
private InnerTestScheduler() {
    this.s = new BooleanSubscription();
}
ObservableTests.java 文件源码 项目:RxJavaFlow 阅读 25 收藏 0 点赞 0 评论 0
/**
 * The error from the user provided Observer is not handled by the subscribe method try/catch.
 * 
 * It is handled by the AtomicObserver that wraps the provided Observer.
 * 
 * Result: Passes (if AtomicObserver functionality exists)
 */
@Test
public void testCustomObservableWithErrorInObserverAsynchronous() throws InterruptedException {
    final CountDownLatch latch = new CountDownLatch(1);
    final AtomicInteger count = new AtomicInteger();
    final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    Observable.create(new OnSubscribe<String>() {

        @Override
        public void call(final Subscriber<? super String> observer) {
            final BooleanSubscription s = new BooleanSubscription();
            new Thread(new Runnable() {

                @Override
                public void run() {
                    try {
                        if (!s.isUnsubscribed()) {
                            observer.onNext("1");
                            observer.onNext("2");
                            observer.onNext("three");
                            observer.onNext("4");
                            observer.onComplete();
                        }
                    } finally {
                        latch.countDown();
                    }
                }
            }).start();
        }
    }).subscribe(new Subscriber<String>() {
        @Override
        public void onComplete() {
            System.out.println("completed");
        }

        @Override
        public void onError(Throwable e) {
            error.set(e);
            System.out.println("error");
            e.printStackTrace();
        }

        @Override
        public void onNext(String v) {
            int num = Integer.parseInt(v);
            System.out.println(num);
            // doSomething(num);
            count.incrementAndGet();
        }

    });

    // wait for async sequence to complete
    latch.await();

    assertEquals(2, count.get());
    assertNotNull(error.get());
    if (!(error.get() instanceof NumberFormatException)) {
        fail("It should be a NumberFormatException");
    }
}
ObservableTests.java 文件源码 项目:lakeside-java 阅读 24 收藏 0 点赞 0 评论 0
/**
 * The error from the user provided Observer is not handled by the subscribe method try/catch.
 * 
 * It is handled by the AtomicObserver that wraps the provided Observer.
 * 
 * Result: Passes (if AtomicObserver functionality exists)
 */
@Test
public void testCustomObservableWithErrorInObserverAsynchronous() throws InterruptedException {
    final CountDownLatch latch = new CountDownLatch(1);
    final AtomicInteger count = new AtomicInteger();
    final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    Observable.create(new OnSubscribeFunc<String>() {

        @Override
        public Subscription onSubscribe(final Observer<? super String> observer) {
            final BooleanSubscription s = new BooleanSubscription();
            new Thread(new Runnable() {

                @Override
                public void run() {
                    try {
                        if (!s.isUnsubscribed()) {
                            observer.onNext("1");
                            observer.onNext("2");
                            observer.onNext("three");
                            observer.onNext("4");
                            observer.onCompleted();
                        }
                    } finally {
                        latch.countDown();
                    }
                }
            }).start();
            return s;
        }
    }).subscribe(new Subscriber<String>() {
        @Override
        public void onCompleted() {
            System.out.println("completed");
        }

        @Override
        public void onError(Throwable e) {
            error.set(e);
            System.out.println("error");
            e.printStackTrace();
        }

        @Override
        public void onNext(String v) {
            int num = Integer.parseInt(v);
            System.out.println(num);
            // doSomething(num);
            count.incrementAndGet();
        }

    });

    // wait for async sequence to complete
    latch.await();

    assertEquals(2, count.get());
    assertNotNull(error.get());
    if (!(error.get() instanceof NumberFormatException)) {
        fail("It should be a NumberFormatException");
    }
}


问题


面经


文章

微信
公众号

扫码关注公众号