void doSubscribe(final Subscriber<? super T> subscriber, final CompositeSubscription currentBase) {
subscriber.add(disconnect(currentBase));
this.source.unsafeSubscribe(new Subscriber<T>(subscriber) {
public void onError(Throwable e) {
cleanup();
subscriber.onError(e);
}
public void onNext(T t) {
subscriber.onNext(t);
}
public void onCompleted() {
cleanup();
subscriber.onCompleted();
}
void cleanup() {
OnSubscribeRefCount.this.lock.lock();
try {
if (OnSubscribeRefCount.this.baseSubscription == currentBase) {
OnSubscribeRefCount.this.baseSubscription.unsubscribe();
OnSubscribeRefCount.this.baseSubscription = new CompositeSubscription();
OnSubscribeRefCount.this.subscriptionCount.set(0);
}
OnSubscribeRefCount.this.lock.unlock();
} catch (Throwable th) {
OnSubscribeRefCount.this.lock.unlock();
}
}
});
}
OnSubscribeRefCount.java 文件源码
java
阅读 26
收藏 0
点赞 0
评论 0
项目:boohee_v5.6
作者:
评论列表
文章目录