/**
* Use this method as the RxNetty HttpServer WebSocket handler.
*
* @param ws
* @return
*/
public Observable<Void> acceptWebsocket(WebSocketConnection ws) {
return toObservable(reactiveSocket.connect(new DuplexConnection() {
@Override
public Publisher<Frame> getInput() {
return toPublisher(ws.getInput().map(frame -> {
// TODO is this copying bytes?
try {
return Frame.from(frame.content().nioBuffer());
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}));
}
@Override
public Publisher<Void> addOutput(Publisher<Frame> o) {
// had to use writeAndFlushOnEach instead of write for frames to reliably get through
// TODO determine if that's expected or not
return toPublisher(ws.writeAndFlushOnEach(toObservable(o).map(frame -> {
return new BinaryWebSocketFrame(Unpooled.wrappedBuffer(frame.getByteBuffer()));
})));
}
}));
}
ReactiveSocketWebSocketServer.java 文件源码
java
阅读 24
收藏 0
点赞 0
评论 0
项目:reactivesocket-websocket-rxnetty
作者:
评论列表
文章目录