HystrixMetricsStreamHandler.java 文件源码

java
阅读 27 收藏 0 点赞 0 评论 0

项目:Grapi 作者:
@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
    if (!msg.uri().startsWith(urlMapping)) {
        ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
        return;
    }

    logger.debug("Handling Hystrix stream request...");
    final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
    response.headers().set(CONTENT_TYPE, "text/event-stream;charset=UTF-8");
    response.headers().set(CACHE_CONTROL, "no-cache, no-store, max-age=0, must-revalidate");
    response.headers().add(PRAGMA, NO_CACHE);
    ctx.writeAndFlush(response);

    final Subject<Void, Void> subject = PublishSubject.create();
    final MultipleAssignmentSubscription subscription = new MultipleAssignmentSubscription();
    Subscription actionSubscription = Observable.timer(0, interval, TimeUnit.MILLISECONDS, Schedulers.computation())
            .subscribe(new Action1<Long>() {
                @Override
                public void call(Long tick) {
                    if (!ctx.channel().isOpen()) {
                        subscription.unsubscribe();
                        logger.debug("Stopping Hystrix Turbine stream to connection");
                        return;
                    }
                    try {
                        Collection<HystrixCommandMetrics> hystrixCommandMetrics = HystrixCommandMetrics.getInstances();
                        Collection<HystrixThreadPoolMetrics> hystrixThreadPoolMetrics = HystrixThreadPoolMetrics.getInstances();
                        logger.debug("Found {} hystrix command metrics", hystrixCommandMetrics.size());
                        logger.debug("Found {} hystrix thread pool metrics", hystrixThreadPoolMetrics.size());
                        for (HystrixCommandMetrics commandMetrics : hystrixCommandMetrics) {
                            writeMetric(toJson(commandMetrics), ctx);
                        }
                        for (HystrixThreadPoolMetrics threadPoolMetrics : hystrixThreadPoolMetrics) {
                            writeMetric(toJson(threadPoolMetrics), ctx);
                        }
                        if (hystrixCommandMetrics.isEmpty() && hystrixThreadPoolMetrics.isEmpty()) {
                            ctx.writeAndFlush(PING.duplicate()).addListener(CLOSE_ON_FAILURE);
                        } else {
                            ctx.flush();
                        }
                    } catch (Exception e) {
                        logger.error("Unexpected error", e);
                        subject.onError(e);
                    }
                }
            });
    subscription.set(actionSubscription);
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号