private void sendMessage(ChannelHandlerContext context, ThriftRequest thriftRequest, ChannelPromise promise)
throws Exception
{
// todo ONEWAY_SEQUENCE_ID is a header protocol thing... make sure this works with framed and unframed
int sequenceId = thriftRequest.isOneway() ? ONEWAY_SEQUENCE_ID : this.sequenceId.incrementAndGet();
RequestHandler requestHandler = new RequestHandler(thriftRequest, sequenceId);
// register timeout
requestHandler.registerRequestTimeout(context.executor());
// write request
ByteBuf requestBuffer = requestHandler.encodeRequest(context.alloc());
// register request if we are expecting a response
if (!thriftRequest.isOneway()) {
if (pendingRequests.putIfAbsent(sequenceId, requestHandler) != null) {
requestHandler.onChannelError(new TTransportException("Another request with the same sequenceId is already in progress"));
}
}
try {
ChannelFuture sendFuture = context.write(requestBuffer, promise);
sendFuture.addListener(future -> messageSent(context, sendFuture, requestHandler));
}
catch (Throwable t) {
onError(context, t);
}
}
ThriftClientHandler.java 文件源码
java
阅读 39
收藏 0
点赞 0
评论 0
项目:drift
作者:
评论列表
文章目录