java类rx.subscriptions.MultipleAssignmentSubscription的实例源码

Scheduler.java 文件源码 项目:boohee_v5.6 阅读 20 收藏 0 点赞 0 评论 0
public Subscription schedulePeriodically(Action0 action, long initialDelay, long period, TimeUnit unit) {
    final long periodInNanos = unit.toNanos(period);
    final long startInNanos = TimeUnit.MILLISECONDS.toNanos(now()) + unit.toNanos(initialDelay);
    final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
    final Action0 action0 = action;
    Action0 recursiveAction = new Action0() {
        long count = 0;

        public void call() {
            if (!mas.isUnsubscribed()) {
                action0.call();
                long j = startInNanos;
                long j2 = this.count + 1;
                this.count = j2;
                mas.set(Worker.this.schedule(this, (j + (j2 * periodInNanos)) - TimeUnit.MILLISECONDS.toNanos(Worker.this.now()), TimeUnit.NANOSECONDS));
            }
        }
    };
    MultipleAssignmentSubscription s = new MultipleAssignmentSubscription();
    mas.set(s);
    s.set(schedule(recursiveAction, initialDelay, unit));
    return mas;
}
HystrixMetricsStreamHandler.java 文件源码 项目:ReactiveLab 阅读 26 收藏 0 点赞 0 评论 0
private Observable<Void> handleHystrixRequest(final HttpServerResponse<O> response) {
    writeHeaders(response);

    final Subject<Void, Void> subject = PublishSubject.create();
    final MultipleAssignmentSubscription subscription = new MultipleAssignmentSubscription();
    Subscription actionSubscription = Observable.timer(0, interval, TimeUnit.MILLISECONDS, Schedulers.computation())
            .subscribe(new Action1<Long>() {
                @Override
                public void call(Long tick) {
                    if (!response.getChannel().isOpen()) {
                        subscription.unsubscribe();
                        return;
                    }
                    try {
                        writeMetric(JsonMapper.toJson(metrics), response);
                    } catch (Exception e) {
                        subject.onError(e);
                    }
                }
            });
    subscription.set(actionSubscription);
    return subject;
}
HystrixMetricsStreamHandler.java 文件源码 项目:ReactiveLab 阅读 26 收藏 0 点赞 0 评论 0
private Observable<Void> handleHystrixRequest(final HttpServerResponse<O> response) {
    writeHeaders(response);

    final Subject<Void, Void> subject = PublishSubject.create();
    final MultipleAssignmentSubscription subscription = new MultipleAssignmentSubscription();
    Subscription actionSubscription = Observable.timer(0, interval, TimeUnit.MILLISECONDS, Schedulers.computation())
            .subscribe(new Action1<Long>() {
                @Override
                public void call(Long tick) {
                    if (!response.getChannel().isOpen()) {
                        subscription.unsubscribe();
                        return;
                    }
                    try {
                        for (HystrixCommandMetrics commandMetrics : HystrixCommandMetrics.getInstances()) {
                            writeMetric(JsonMapper.toJson(commandMetrics), response);
                        }
                        for (HystrixThreadPoolMetrics threadPoolMetrics : HystrixThreadPoolMetrics.getInstances()) {
                            writeMetric(JsonMapper.toJson(threadPoolMetrics), response);
                        }
                    } catch (Exception e) {
                        subject.onError(e);
                    }
                }
            });
    subscription.set(actionSubscription);
    return subject;
}
ExecutorScheduler.java 文件源码 项目:org.openntf.domino 阅读 19 收藏 0 点赞 0 评论 0
@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
    if (delayTime <= 0) {
        return schedule(action);
    }
    if (isUnsubscribed()) {
        return Subscriptions.empty();
    }
    ScheduledExecutorService service;
    if (executor instanceof ScheduledExecutorService) {
        service = (ScheduledExecutorService)executor;
    } else {
        service = GenericScheduledExecutorService.getInstance();
    }

    final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
    // tasks.add(mas); // Needs a removal without unsubscription

    try {
        Future<?> f = service.schedule(new Runnable() {
            @Override
            public void run() {
                if (mas.isUnsubscribed()) {
                    return;
                }
                mas.set(schedule(action));
                // tasks.delete(mas); // Needs a removal without unsubscription
            }
        }, delayTime, unit);
        mas.set(Subscriptions.from(f));
    } catch (RejectedExecutionException t) {
        // report the rejection to plugins
        RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
        throw t;
    }

    return mas;
}
ExecutorScheduler.java 文件源码 项目:boohee_v5.6 阅读 20 收藏 0 点赞 0 评论 0
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
    if (delayTime <= 0) {
        return schedule(action);
    }
    if (isUnsubscribed()) {
        return Subscriptions.unsubscribed();
    }
    ScheduledExecutorService service;
    if (this.executor instanceof ScheduledExecutorService) {
        service = this.executor;
    } else {
        service = GenericScheduledExecutorService.getInstance();
    }
    MultipleAssignmentSubscription first = new MultipleAssignmentSubscription();
    final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
    mas.set(first);
    this.tasks.add(mas);
    final Subscription removeMas = Subscriptions.create(new Action0() {
        public void call() {
            ExecutorSchedulerWorker.this.tasks.remove(mas);
        }
    });
    ScheduledAction ea = new ScheduledAction(new Action0() {
        public void call() {
            if (!mas.isUnsubscribed()) {
                Subscription s2 = ExecutorSchedulerWorker.this.schedule(action);
                mas.set(s2);
                if (s2.getClass() == ScheduledAction.class) {
                    ((ScheduledAction) s2).add(removeMas);
                }
            }
        }
    });
    first.set(ea);
    try {
        ea.add(service.schedule(ea, delayTime, unit));
        return removeMas;
    } catch (RejectedExecutionException t) {
        RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
        throw t;
    }
}
OnSubscribeExponentialDelay.java 文件源码 项目:android-common 阅读 14 收藏 0 点赞 0 评论 0
public Subscription schedulePeriodically(
    final Scheduler.Worker worker,
    final Action0 action,
    long initialDelay,
    Delay periodDelay,
    final TimeUnit unit) {
  final Delay period = periodDelay;
  final long firstNowNanos = TimeUnit.MILLISECONDS.toNanos(worker.now());
  final long firstStartInNanos = firstNowNanos + unit.toNanos(initialDelay);

  final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
  final Action0 recursiveAction =
      new Action0() {
        long count;
        long lastNowNanos = firstNowNanos;
        long startInNanos = firstStartInNanos;

        @Override
        public void call() {
          if (!mas.isUnsubscribed()) {
            action.call();
            long periodInNanos = unit.toNanos(period.calculate(count));
            long nextTick;

            long nowNanos = TimeUnit.MILLISECONDS.toNanos(worker.now());
            // If the clock moved in a direction quite a bit, rebase the repetition period
            if (nowNanos + CLOCK_DRIFT_TOLERANCE_NANOS < lastNowNanos
                || nowNanos >= lastNowNanos + periodInNanos + CLOCK_DRIFT_TOLERANCE_NANOS) {
              nextTick = nowNanos + periodInNanos;
              /*
               * Shift the start point back by the drift as if the whole thing
               * started count periods ago.
               */
              startInNanos = nextTick - (periodInNanos * (++count));
            } else {
              nextTick = startInNanos + (++count * periodInNanos);
            }
            lastNowNanos = nowNanos;

            long delay = nextTick - nowNanos;
            mas.set(worker.schedule(this, delay, TimeUnit.NANOSECONDS));
          }
        }
      };
  MultipleAssignmentSubscription s = new MultipleAssignmentSubscription();
  // Should call `mas.set` before `schedule`, or the new Subscription may replace the old one.
  mas.set(s);
  s.set(worker.schedule(recursiveAction, initialDelay, unit));
  return mas;
}
HystrixMetricsStreamHandler.java 文件源码 项目:Grapi 阅读 25 收藏 0 点赞 0 评论 0
@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
    if (!msg.uri().startsWith(urlMapping)) {
        ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
        return;
    }

    logger.debug("Handling Hystrix stream request...");
    final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
    response.headers().set(CONTENT_TYPE, "text/event-stream;charset=UTF-8");
    response.headers().set(CACHE_CONTROL, "no-cache, no-store, max-age=0, must-revalidate");
    response.headers().add(PRAGMA, NO_CACHE);
    ctx.writeAndFlush(response);

    final Subject<Void, Void> subject = PublishSubject.create();
    final MultipleAssignmentSubscription subscription = new MultipleAssignmentSubscription();
    Subscription actionSubscription = Observable.timer(0, interval, TimeUnit.MILLISECONDS, Schedulers.computation())
            .subscribe(new Action1<Long>() {
                @Override
                public void call(Long tick) {
                    if (!ctx.channel().isOpen()) {
                        subscription.unsubscribe();
                        logger.debug("Stopping Hystrix Turbine stream to connection");
                        return;
                    }
                    try {
                        Collection<HystrixCommandMetrics> hystrixCommandMetrics = HystrixCommandMetrics.getInstances();
                        Collection<HystrixThreadPoolMetrics> hystrixThreadPoolMetrics = HystrixThreadPoolMetrics.getInstances();
                        logger.debug("Found {} hystrix command metrics", hystrixCommandMetrics.size());
                        logger.debug("Found {} hystrix thread pool metrics", hystrixThreadPoolMetrics.size());
                        for (HystrixCommandMetrics commandMetrics : hystrixCommandMetrics) {
                            writeMetric(toJson(commandMetrics), ctx);
                        }
                        for (HystrixThreadPoolMetrics threadPoolMetrics : hystrixThreadPoolMetrics) {
                            writeMetric(toJson(threadPoolMetrics), ctx);
                        }
                        if (hystrixCommandMetrics.isEmpty() && hystrixThreadPoolMetrics.isEmpty()) {
                            ctx.writeAndFlush(PING.duplicate()).addListener(CLOSE_ON_FAILURE);
                        } else {
                            ctx.flush();
                        }
                    } catch (Exception e) {
                        logger.error("Unexpected error", e);
                        subject.onError(e);
                    }
                }
            });
    subscription.set(actionSubscription);
}
Scheduler.java 文件源码 项目:org.openntf.domino 阅读 23 收藏 0 点赞 0 评论 0
/**
 * Schedules a cancelable action to be executed periodically. This default implementation schedules
 * recursively and waits for actions to complete (instead of potentially executing long-running actions
 * concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
 * <p>
 * Note to implementors: non-positive {@code initialTime} and {@code period} should be regarded as
 * undelayed scheduling of the first and any subsequent executions.
 * 
 * @param action
 *            the Action to execute periodically
 * @param initialDelay
 *            time to wait before executing the action for the first time; non-positive values indicate
 *            an undelayed schedule
 * @param period
 *            the time interval to wait each time in between executing the action; non-positive values
 *            indicate no delay between repeated schedules
 * @param unit
 *            the time unit of {@code period}
 * @return a subscription to be able to unsubscribe the action (unschedule it if not executed)
 */
public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) {
    final long periodInNanos = unit.toNanos(period);
    final long startInNanos = TimeUnit.MILLISECONDS.toNanos(now()) + unit.toNanos(initialDelay);

    final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
    final Action0 recursiveAction = new Action0() {
        long count = 0;
        @Override
        public void call() {
            if (!mas.isUnsubscribed()) {
                action.call();
                long nextTick = startInNanos + (++count * periodInNanos);
                mas.set(schedule(this, nextTick - TimeUnit.MILLISECONDS.toNanos(now()), TimeUnit.NANOSECONDS));
            }
        }
    };
    mas.set(schedule(recursiveAction, initialDelay, unit));
    return mas;
}


问题


面经


文章

微信
公众号

扫码关注公众号