public Subscription schedulePeriodically(Action0 action, long initialDelay, long period, TimeUnit unit) {
final long periodInNanos = unit.toNanos(period);
final long startInNanos = TimeUnit.MILLISECONDS.toNanos(now()) + unit.toNanos(initialDelay);
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
final Action0 action0 = action;
Action0 recursiveAction = new Action0() {
long count = 0;
public void call() {
if (!mas.isUnsubscribed()) {
action0.call();
long j = startInNanos;
long j2 = this.count + 1;
this.count = j2;
mas.set(Worker.this.schedule(this, (j + (j2 * periodInNanos)) - TimeUnit.MILLISECONDS.toNanos(Worker.this.now()), TimeUnit.NANOSECONDS));
}
}
};
MultipleAssignmentSubscription s = new MultipleAssignmentSubscription();
mas.set(s);
s.set(schedule(recursiveAction, initialDelay, unit));
return mas;
}
java类rx.subscriptions.MultipleAssignmentSubscription的实例源码
Scheduler.java 文件源码
项目:boohee_v5.6
阅读 20
收藏 0
点赞 0
评论 0
HystrixMetricsStreamHandler.java 文件源码
项目:ReactiveLab
阅读 26
收藏 0
点赞 0
评论 0
private Observable<Void> handleHystrixRequest(final HttpServerResponse<O> response) {
writeHeaders(response);
final Subject<Void, Void> subject = PublishSubject.create();
final MultipleAssignmentSubscription subscription = new MultipleAssignmentSubscription();
Subscription actionSubscription = Observable.timer(0, interval, TimeUnit.MILLISECONDS, Schedulers.computation())
.subscribe(new Action1<Long>() {
@Override
public void call(Long tick) {
if (!response.getChannel().isOpen()) {
subscription.unsubscribe();
return;
}
try {
writeMetric(JsonMapper.toJson(metrics), response);
} catch (Exception e) {
subject.onError(e);
}
}
});
subscription.set(actionSubscription);
return subject;
}
HystrixMetricsStreamHandler.java 文件源码
项目:ReactiveLab
阅读 26
收藏 0
点赞 0
评论 0
private Observable<Void> handleHystrixRequest(final HttpServerResponse<O> response) {
writeHeaders(response);
final Subject<Void, Void> subject = PublishSubject.create();
final MultipleAssignmentSubscription subscription = new MultipleAssignmentSubscription();
Subscription actionSubscription = Observable.timer(0, interval, TimeUnit.MILLISECONDS, Schedulers.computation())
.subscribe(new Action1<Long>() {
@Override
public void call(Long tick) {
if (!response.getChannel().isOpen()) {
subscription.unsubscribe();
return;
}
try {
for (HystrixCommandMetrics commandMetrics : HystrixCommandMetrics.getInstances()) {
writeMetric(JsonMapper.toJson(commandMetrics), response);
}
for (HystrixThreadPoolMetrics threadPoolMetrics : HystrixThreadPoolMetrics.getInstances()) {
writeMetric(JsonMapper.toJson(threadPoolMetrics), response);
}
} catch (Exception e) {
subject.onError(e);
}
}
});
subscription.set(actionSubscription);
return subject;
}
ExecutorScheduler.java 文件源码
项目:org.openntf.domino
阅读 19
收藏 0
点赞 0
评论 0
@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
if (delayTime <= 0) {
return schedule(action);
}
if (isUnsubscribed()) {
return Subscriptions.empty();
}
ScheduledExecutorService service;
if (executor instanceof ScheduledExecutorService) {
service = (ScheduledExecutorService)executor;
} else {
service = GenericScheduledExecutorService.getInstance();
}
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
// tasks.add(mas); // Needs a removal without unsubscription
try {
Future<?> f = service.schedule(new Runnable() {
@Override
public void run() {
if (mas.isUnsubscribed()) {
return;
}
mas.set(schedule(action));
// tasks.delete(mas); // Needs a removal without unsubscription
}
}, delayTime, unit);
mas.set(Subscriptions.from(f));
} catch (RejectedExecutionException t) {
// report the rejection to plugins
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
throw t;
}
return mas;
}
ExecutorScheduler.java 文件源码
项目:boohee_v5.6
阅读 20
收藏 0
点赞 0
评论 0
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
if (delayTime <= 0) {
return schedule(action);
}
if (isUnsubscribed()) {
return Subscriptions.unsubscribed();
}
ScheduledExecutorService service;
if (this.executor instanceof ScheduledExecutorService) {
service = this.executor;
} else {
service = GenericScheduledExecutorService.getInstance();
}
MultipleAssignmentSubscription first = new MultipleAssignmentSubscription();
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
mas.set(first);
this.tasks.add(mas);
final Subscription removeMas = Subscriptions.create(new Action0() {
public void call() {
ExecutorSchedulerWorker.this.tasks.remove(mas);
}
});
ScheduledAction ea = new ScheduledAction(new Action0() {
public void call() {
if (!mas.isUnsubscribed()) {
Subscription s2 = ExecutorSchedulerWorker.this.schedule(action);
mas.set(s2);
if (s2.getClass() == ScheduledAction.class) {
((ScheduledAction) s2).add(removeMas);
}
}
}
});
first.set(ea);
try {
ea.add(service.schedule(ea, delayTime, unit));
return removeMas;
} catch (RejectedExecutionException t) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
throw t;
}
}
OnSubscribeExponentialDelay.java 文件源码
项目:android-common
阅读 14
收藏 0
点赞 0
评论 0
public Subscription schedulePeriodically(
final Scheduler.Worker worker,
final Action0 action,
long initialDelay,
Delay periodDelay,
final TimeUnit unit) {
final Delay period = periodDelay;
final long firstNowNanos = TimeUnit.MILLISECONDS.toNanos(worker.now());
final long firstStartInNanos = firstNowNanos + unit.toNanos(initialDelay);
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
final Action0 recursiveAction =
new Action0() {
long count;
long lastNowNanos = firstNowNanos;
long startInNanos = firstStartInNanos;
@Override
public void call() {
if (!mas.isUnsubscribed()) {
action.call();
long periodInNanos = unit.toNanos(period.calculate(count));
long nextTick;
long nowNanos = TimeUnit.MILLISECONDS.toNanos(worker.now());
// If the clock moved in a direction quite a bit, rebase the repetition period
if (nowNanos + CLOCK_DRIFT_TOLERANCE_NANOS < lastNowNanos
|| nowNanos >= lastNowNanos + periodInNanos + CLOCK_DRIFT_TOLERANCE_NANOS) {
nextTick = nowNanos + periodInNanos;
/*
* Shift the start point back by the drift as if the whole thing
* started count periods ago.
*/
startInNanos = nextTick - (periodInNanos * (++count));
} else {
nextTick = startInNanos + (++count * periodInNanos);
}
lastNowNanos = nowNanos;
long delay = nextTick - nowNanos;
mas.set(worker.schedule(this, delay, TimeUnit.NANOSECONDS));
}
}
};
MultipleAssignmentSubscription s = new MultipleAssignmentSubscription();
// Should call `mas.set` before `schedule`, or the new Subscription may replace the old one.
mas.set(s);
s.set(worker.schedule(recursiveAction, initialDelay, unit));
return mas;
}
HystrixMetricsStreamHandler.java 文件源码
项目:Grapi
阅读 25
收藏 0
点赞 0
评论 0
@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
if (!msg.uri().startsWith(urlMapping)) {
ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
return;
}
logger.debug("Handling Hystrix stream request...");
final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.headers().set(CONTENT_TYPE, "text/event-stream;charset=UTF-8");
response.headers().set(CACHE_CONTROL, "no-cache, no-store, max-age=0, must-revalidate");
response.headers().add(PRAGMA, NO_CACHE);
ctx.writeAndFlush(response);
final Subject<Void, Void> subject = PublishSubject.create();
final MultipleAssignmentSubscription subscription = new MultipleAssignmentSubscription();
Subscription actionSubscription = Observable.timer(0, interval, TimeUnit.MILLISECONDS, Schedulers.computation())
.subscribe(new Action1<Long>() {
@Override
public void call(Long tick) {
if (!ctx.channel().isOpen()) {
subscription.unsubscribe();
logger.debug("Stopping Hystrix Turbine stream to connection");
return;
}
try {
Collection<HystrixCommandMetrics> hystrixCommandMetrics = HystrixCommandMetrics.getInstances();
Collection<HystrixThreadPoolMetrics> hystrixThreadPoolMetrics = HystrixThreadPoolMetrics.getInstances();
logger.debug("Found {} hystrix command metrics", hystrixCommandMetrics.size());
logger.debug("Found {} hystrix thread pool metrics", hystrixThreadPoolMetrics.size());
for (HystrixCommandMetrics commandMetrics : hystrixCommandMetrics) {
writeMetric(toJson(commandMetrics), ctx);
}
for (HystrixThreadPoolMetrics threadPoolMetrics : hystrixThreadPoolMetrics) {
writeMetric(toJson(threadPoolMetrics), ctx);
}
if (hystrixCommandMetrics.isEmpty() && hystrixThreadPoolMetrics.isEmpty()) {
ctx.writeAndFlush(PING.duplicate()).addListener(CLOSE_ON_FAILURE);
} else {
ctx.flush();
}
} catch (Exception e) {
logger.error("Unexpected error", e);
subject.onError(e);
}
}
});
subscription.set(actionSubscription);
}
Scheduler.java 文件源码
项目:org.openntf.domino
阅读 23
收藏 0
点赞 0
评论 0
/**
* Schedules a cancelable action to be executed periodically. This default implementation schedules
* recursively and waits for actions to complete (instead of potentially executing long-running actions
* concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
* <p>
* Note to implementors: non-positive {@code initialTime} and {@code period} should be regarded as
* undelayed scheduling of the first and any subsequent executions.
*
* @param action
* the Action to execute periodically
* @param initialDelay
* time to wait before executing the action for the first time; non-positive values indicate
* an undelayed schedule
* @param period
* the time interval to wait each time in between executing the action; non-positive values
* indicate no delay between repeated schedules
* @param unit
* the time unit of {@code period}
* @return a subscription to be able to unsubscribe the action (unschedule it if not executed)
*/
public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) {
final long periodInNanos = unit.toNanos(period);
final long startInNanos = TimeUnit.MILLISECONDS.toNanos(now()) + unit.toNanos(initialDelay);
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
final Action0 recursiveAction = new Action0() {
long count = 0;
@Override
public void call() {
if (!mas.isUnsubscribed()) {
action.call();
long nextTick = startInNanos + (++count * periodInNanos);
mas.set(schedule(this, nextTick - TimeUnit.MILLISECONDS.toNanos(now()), TimeUnit.NANOSECONDS));
}
}
};
mas.set(schedule(recursiveAction, initialDelay, unit));
return mas;
}