@Override
public Subscriber<? super T> call(Subscriber<? super T> subscriber) {
Scheduler.Worker inner = scheduler.createWorker();
subscriber.add(inner);
final SerialSubscription serial = new SerialSubscription();
subscriber.add(serial);
// Use SynchronizedSubscriber for safe memory access
// as the subscriber will be accessed in the current thread or the
// scheduler or other Observables.
final SerializedSubscriber<T> synchronizedSubscriber = new SerializedSubscriber<T>(subscriber);
TimeoutSubscriber<T> timeoutSubscriber = new TimeoutSubscriber<T>(synchronizedSubscriber, timeoutStub, serial, other, inner);
serial.set(firstTimeoutStub.call(timeoutSubscriber, 0L, inner));
return timeoutSubscriber;
}
OperatorTimeoutBase.java 文件源码
java
阅读 23
收藏 0
点赞 0
评论 0
项目:org.openntf.domino
作者:
评论列表
文章目录