@Override
public void onNext(TRight args) {
int id;
int highLeftId;
synchronized (guard) {
id = rightId++;
rightMap.put(id, args);
highLeftId = leftId;
}
SerialSubscription md = new SerialSubscription();
group.add(md);
Observable<TRightDuration> duration;
try {
duration = rightDurationSelector.call(args);
Subscriber<TRightDuration> d2 = new RightDurationSubscriber(id);
group.add(d2);
duration.unsafeSubscribe(d2);
List<TLeft> leftValues = new ArrayList<TLeft>();
synchronized (guard) {
for (Map.Entry<Integer, TLeft> entry : leftMap.entrySet()) {
if (entry.getKey() < highLeftId) {
leftValues.add(entry.getValue());
}
}
}
for (TLeft lv : leftValues) {
R result = resultSelector.call(lv, args);
subscriber.onNext(result);
}
} catch (Throwable t) {
onError(t);
}
}
OnSubscribeJoin.java 文件源码
java
阅读 21
收藏 0
点赞 0
评论 0
项目:RxJavaFlow
作者:
评论列表
文章目录