OnSubscribeJoin.java 文件源码

java
阅读 22 收藏 0 点赞 0 评论 0

项目:boohee_v5.6 作者:
public void onNext(TRight args) {
    synchronized (ResultSink.this.guard) {
        ResultSink resultSink = ResultSink.this;
        int id = resultSink.rightId;
        resultSink.rightId = id + 1;
        ResultSink.this.rightMap.put(Integer.valueOf(id), args);
        int highLeftId = ResultSink.this.leftId;
    }
    ResultSink.this.group.add(new SerialSubscription());
    try {
        Observable<TRightDuration> duration = (Observable) OnSubscribeJoin.this.rightDurationSelector.call(args);
        Subscriber<TRightDuration> d2 = new RightDurationSubscriber(id);
        ResultSink.this.group.add(d2);
        duration.unsafeSubscribe(d2);
        List<TLeft> leftValues = new ArrayList();
        synchronized (ResultSink.this.guard) {
            for (Entry<Integer, TLeft> entry : ResultSink.this.leftMap.entrySet()) {
                if (((Integer) entry.getKey()).intValue() < highLeftId) {
                    leftValues.add(entry.getValue());
                }
            }
        }
        for (TLeft lv : leftValues) {
            ResultSink.this.subscriber.onNext(OnSubscribeJoin.this.resultSelector.call(lv, args));
        }
    } catch (Throwable t) {
        Exceptions.throwOrReport(t, this);
    }
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号