@Override
protected AsyncRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) {
setConf(conf);
return new AsyncRpcClient(conf, new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
promise.setFailure(new RuntimeException("Injected fault"));
}
});
}
});
}
java类io.netty.channel.ChannelOutboundHandlerAdapter的实例源码
TestAsyncIPC.java 文件源码
项目:ditb
阅读 32
收藏 0
点赞 0
评论 0
SseOverHttpServerPipelineConfigurator.java 文件源码
项目:RxNetty
阅读 25
收藏 0
点赞 0
评论 0
@Override
public void configureNewPipeline(ChannelPipeline pipeline) {
serverPipelineConfigurator.configureNewPipeline(pipeline);
pipeline.addLast(SSE_ENCODER_HANDLER_NAME, SERVER_SENT_EVENT_ENCODER);
pipeline.addLast(SSE_RESPONSE_HEADERS_COMPLETER, new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (HttpServerResponse.class.isAssignableFrom(msg.getClass())) {
@SuppressWarnings("rawtypes")
HttpServerResponse rxResponse = (HttpServerResponse) msg;
String contentTypeHeader = rxResponse.getHeaders().get(CONTENT_TYPE);
if (null == contentTypeHeader) {
rxResponse.getHeaders().set(CONTENT_TYPE, "text/event-stream");
}
}
super.write(ctx, msg, promise);
}
});
}
VerifyCornerCasesComponentTest.java 文件源码
项目:riposte
阅读 31
收藏 0
点赞 0
评论 0
@Test
public void invalid_http_call_should_result_in_expected_400_error() throws Exception {
// given
// Normal request, but fiddle with the first chunk as it's going out to remove the HTTP version and make it an
// invalid HTTP call.
NettyHttpClientRequestBuilder request = request()
.withMethod(HttpMethod.GET)
.withUri(BasicEndpoint.MATCHING_PATH)
.withPipelineAdjuster(
p -> p.addFirst(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
String msgAsString = ((ByteBuf)msg).toString(CharsetUtil.UTF_8);
if (msgAsString.contains("HTTP/1.1")) {
msg = Unpooled.copiedBuffer(msgAsString.replace("HTTP/1.1", ""), CharsetUtil.UTF_8);
}
super.write(ctx, msg, promise);
}
})
);
// when
NettyHttpClientResponse response = request.execute(downstreamServerConfig.endpointsPort(), 3000);
// then
verifyErrorReceived(response.payload,
response.statusCode,
new ApiErrorWithMetadata(SampleCoreApiError.MALFORMED_REQUEST,
Pair.of("cause", "Invalid HTTP request"))
);
}