public void onNext(TRight args) {
synchronized (ResultSink.this.guard) {
ResultSink resultSink = ResultSink.this;
int id = resultSink.rightId;
resultSink.rightId = id + 1;
ResultSink.this.rightMap.put(Integer.valueOf(id), args);
int highLeftId = ResultSink.this.leftId;
}
ResultSink.this.group.add(new SerialSubscription());
try {
Observable<TRightDuration> duration = (Observable) OnSubscribeJoin.this.rightDurationSelector.call(args);
Subscriber<TRightDuration> d2 = new RightDurationSubscriber(id);
ResultSink.this.group.add(d2);
duration.unsafeSubscribe(d2);
List<TLeft> leftValues = new ArrayList();
synchronized (ResultSink.this.guard) {
for (Entry<Integer, TLeft> entry : ResultSink.this.leftMap.entrySet()) {
if (((Integer) entry.getKey()).intValue() < highLeftId) {
leftValues.add(entry.getValue());
}
}
}
for (TLeft lv : leftValues) {
ResultSink.this.subscriber.onNext(OnSubscribeJoin.this.resultSelector.call(lv, args));
}
} catch (Throwable t) {
Exceptions.throwOrReport(t, this);
}
}
OnSubscribeJoin.java 文件源码
java
阅读 22
收藏 0
点赞 0
评论 0
项目:boohee_v5.6
作者:
评论列表
文章目录