NettyFirehoseOnSubscribe.java 文件源码

java
阅读 31 收藏 0 点赞 0 评论 0

项目:snotel 作者:
@Override
protected void channelRead0(ChannelHandlerContext context, Object message) throws Exception {
    final Channel channel = context.channel();
    if (!handshaker.isHandshakeComplete()) {
        handshaker.finishHandshake(channel, (FullHttpResponse) message);
        channel.pipeline().addBefore(HANDLER_NAME, "websocket-frame-aggregator", new WebSocketFrameAggregator(64 * 1024));
        subscriber.onStart();
        return;
    }

    if (message instanceof FullHttpResponse) {
        final FullHttpResponse response = (FullHttpResponse) message;
        throw new IllegalStateException(
                "Unexpected FullHttpResponse (getStatus=" + response.getStatus() +
                        ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
    }

    final WebSocketFrame frame = (WebSocketFrame) message;
    if (frame instanceof PingWebSocketFrame) {
        context.writeAndFlush(new PongWebSocketFrame(((PingWebSocketFrame)frame).retain().content()));
    } else if (frame instanceof BinaryWebSocketFrame) {
        final ByteBufInputStream input = new ByteBufInputStream(((BinaryWebSocketFrame)message).content());
        final Envelope envelope = Envelope.ADAPTER.decode(input);
        subscriber.onNext(envelope);
    }
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号