ReactiveSocketWebSocketServer.java 文件源码

java
阅读 24 收藏 0 点赞 0 评论 0

项目:reactivesocket-websocket-rxnetty 作者:
/**
   * Use this method as the RxNetty HttpServer WebSocket handler.
   * 
   * @param ws
   * @return
   */
  public Observable<Void> acceptWebsocket(WebSocketConnection ws) {
    return toObservable(reactiveSocket.connect(new DuplexConnection() {
    @Override
    public Publisher<Frame> getInput() {
        return toPublisher(ws.getInput().map(frame -> {
            // TODO is this copying bytes?
            try {
                return Frame.from(frame.content().nioBuffer());
            } catch (Exception e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }
        }));
    }

    @Override
    public Publisher<Void> addOutput(Publisher<Frame> o) {
        // had to use writeAndFlushOnEach instead of write for frames to reliably get through
        // TODO determine if that's expected or not
        return toPublisher(ws.writeAndFlushOnEach(toObservable(o).map(frame -> {
            return new BinaryWebSocketFrame(Unpooled.wrappedBuffer(frame.getByteBuffer()));
        })));
    }
}));
  }
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号