@Override
protected void channelRead0(ChannelHandlerContext context, Object message) throws Exception {
final Channel channel = context.channel();
if (!handshaker.isHandshakeComplete()) {
handshaker.finishHandshake(channel, (FullHttpResponse) message);
channel.pipeline().addBefore(HANDLER_NAME, "websocket-frame-aggregator", new WebSocketFrameAggregator(64 * 1024));
subscriber.onStart();
return;
}
if (message instanceof FullHttpResponse) {
final FullHttpResponse response = (FullHttpResponse) message;
throw new IllegalStateException(
"Unexpected FullHttpResponse (getStatus=" + response.getStatus() +
", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
}
final WebSocketFrame frame = (WebSocketFrame) message;
if (frame instanceof PingWebSocketFrame) {
context.writeAndFlush(new PongWebSocketFrame(((PingWebSocketFrame)frame).retain().content()));
} else if (frame instanceof BinaryWebSocketFrame) {
final ByteBufInputStream input = new ByteBufInputStream(((BinaryWebSocketFrame)message).content());
final Envelope envelope = Envelope.ADAPTER.decode(input);
subscriber.onNext(envelope);
}
}
NettyFirehoseOnSubscribe.java 文件源码
java
阅读 31
收藏 0
点赞 0
评论 0
项目:snotel
作者:
评论列表
文章目录