private ReactiveSocketWebSocketClient(WebSocketConnection wsConn) {
this.reactiveSocket = ReactiveSocket.createRequestor();
connect = this.reactiveSocket.connect(
new DuplexConnection() {
@Override
public Publisher<Frame> getInput() {
return toPublisher(wsConn.getInput().map(frame -> {
return Frame.from(frame.content().nioBuffer());
}));
}
@Override
public Publisher<Void> addOutput(Publisher<Frame> o) {
// had to use writeAndFlushOnEach instead of write for frames to get through
// TODO determine if that's expected or not
Publisher<Void> p = toPublisher(wsConn.writeAndFlushOnEach(toObservable(o)
.map(frame -> new BinaryWebSocketFrame(Unpooled.wrappedBuffer(frame.getByteBuffer())))
));
return p;
}
});
}
ReactiveSocketWebSocketClient.java 文件源码
java
阅读 33
收藏 0
点赞 0
评论 0
项目:reactivesocket-websocket-rxnetty
作者:
评论列表
文章目录