OnSubscribeJoin.java 文件源码

java
阅读 21 收藏 0 点赞 0 评论 0

项目:RxJavaFlow 作者:
@Override
public void onNext(TRight args) {
    int id; 
    int highLeftId;
    synchronized (guard) {
        id = rightId++;
        rightMap.put(id, args);
        highLeftId = leftId;
    }
    SerialSubscription md = new SerialSubscription();
    group.add(md);

    Observable<TRightDuration> duration;
    try {
        duration = rightDurationSelector.call(args);

        Subscriber<TRightDuration> d2 = new RightDurationSubscriber(id);
        group.add(d2);

        duration.unsafeSubscribe(d2);


        List<TLeft> leftValues = new ArrayList<TLeft>();
        synchronized (guard) {
            for (Map.Entry<Integer, TLeft> entry : leftMap.entrySet()) {
                if (entry.getKey() < highLeftId) {
                    leftValues.add(entry.getValue());
                }
            }
        }

        for (TLeft lv : leftValues) {
            R result = resultSelector.call(lv, args);
            subscriber.onNext(result);
        }

    } catch (Throwable t) {
        onError(t);
    }
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号