/**
* @param commandId
* @param clusterMessage
* @since 1.0
*/
private void doGroup(short commandId, MyClusterMessage clusterMessage) {
logger.debug("MyClusterMessageListener#doSwitch");
IMHeader header = clusterMessage.getHeader();
try {
MessageLite body = clusterMessage.getMessage();
switch (commandId) {
case GroupCmdID.CID_GROUP_CHANGE_MEMBER_NOTIFY_VALUE:// todebug
groupChangeMemberNotify(header, body);
break;
default:
logger.warn("Unsupport command id {}", commandId);
break;
}
} catch (IOException e) {
logger.error("decode failed.", e);
}
}
java类com.google.protobuf.MessageLite的实例源码
MyClusterMessageListener.java 文件源码
项目:sctalk
阅读 30
收藏 0
点赞 0
评论 0
IntentProtocolBufferExtractor.java 文件源码
项目:OpenYOLO-Android
阅读 28
收藏 0
点赞 0
评论 0
/**
* Attempts to extract a protocol buffer from the specified extra.
* @throws MalformedDataException if the intent is null, the extra is missing or not a byte
* array, or the protocol buffer could not be parsed.
*/
@NonNull
public static <T extends MessageLite> T extract(
@NonNull String extraName,
@NonNull Parser<T> protoParser,
@NonNull String failureDescription,
@Nullable Intent intent)
throws MalformedDataException {
if (intent == null) {
throw new MalformedDataException(failureDescription);
}
byte[] protoBytes = intent.getByteArrayExtra(extraName);
if (protoBytes == null) {
throw new MalformedDataException(failureDescription);
}
try {
return protoParser.parseFrom(protoBytes);
} catch (IOException ex) {
throw new MalformedDataException(failureDescription, ex);
}
}
IMLoginHandlerImpl.java 文件源码
项目:sctalk
阅读 23
收藏 0
点赞 0
评论 0
@Override
public void pushShield(IMHeader header, MessageLite body, ChannelHandlerContext ctx) {
IMPushShieldReq pushShieldReq = (IMPushShieldReq) body;
long userId = super.getUserId(ctx);
IMHeader resHeader = null;
IMPushShieldRsp pushShieldRsp = null;
try {
BaseModel<Integer> pushShieldRes = loginService.pushShield(userId, pushShieldReq.getShieldStatus());
pushShieldRsp = IMPushShieldRsp.newBuilder()
.setUserId(userId)
.setResultCode(pushShieldRes.getCode())
.build();
resHeader = header.clone();
resHeader.setCommandId((short)LoginCmdID.CID_LOGIN_RES_PUSH_SHIELD_VALUE);
ctx.writeAndFlush(new IMProtoMessage<>(resHeader, pushShieldRsp));
} catch(Exception e){
logger.error("服务器端异常", e);
ctx.writeAndFlush(new IMProtoMessage<>(resHeader, pushShieldRsp));
}
}
DefaultInstanceHandler.java 文件源码
项目:QDrill
阅读 31
收藏 0
点赞 0
评论 0
public static MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
switch (rpcType) {
case RpcType.ACK_VALUE:
return Ack.getDefaultInstance();
case RpcType.HANDSHAKE_VALUE:
return BitControlHandshake.getDefaultInstance();
case RpcType.RESP_FRAGMENT_HANDLE_VALUE:
return FragmentHandle.getDefaultInstance();
case RpcType.RESP_FRAGMENT_STATUS_VALUE:
return FragmentStatus.getDefaultInstance();
case RpcType.RESP_BIT_STATUS_VALUE:
return BitStatus.getDefaultInstance();
case RpcType.RESP_QUERY_STATUS_VALUE:
return QueryProfile.getDefaultInstance();
default:
throw new UnsupportedOperationException();
}
}
DataServer.java 文件源码
项目:QDrill
阅读 39
收藏 0
点赞 0
评论 0
@Override
protected ServerHandshakeHandler<BitClientHandshake> getHandshakeHandler(final BitServerConnection connection) {
return new ServerHandshakeHandler<BitClientHandshake>(RpcType.HANDSHAKE, BitClientHandshake.PARSER) {
@Override
public MessageLite getHandshakeResponse(BitClientHandshake inbound) throws Exception {
// logger.debug("Handling handshake from other bit. {}", inbound);
if (inbound.getRpcVersion() != DataRpcConfig.RPC_VERSION) {
throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.",
inbound.getRpcVersion(), DataRpcConfig.RPC_VERSION));
}
if (inbound.getChannel() != RpcChannel.BIT_DATA) {
throw new RpcException(String.format("Invalid NodeMode. Expected BIT_DATA but received %s.",
inbound.getChannel()));
}
return BitServerHandshake.newBuilder().setRpcVersion(DataRpcConfig.RPC_VERSION).build();
}
};
}
MyClusterMessageListener.java 文件源码
项目:sctalk
阅读 41
收藏 0
点赞 0
评论 0
/**
* 发送当前踢人消息 handleKickUser
*
* @param MessageLite
* @param ChannelHandlerContext
* @since 1.0 李春生
*/
private void handleKickUser(MessageLite body) {
// 转换body中的数据,判断是否是真正的kickUser消息,如果是,则进行下面的操作,不是抛出异常
IMServerKickUser kickUser = (IMServerKickUser) body;
long userId = kickUser.getUserId();
int clientType = kickUser.getClientType().getNumber();
int reason = kickUser.getReason();
logger.debug("HandleKickUser, userId={}, clientType={}, reason={}", userId, clientType,
reason);
ClientUser clientUser = ClientUserManager.getUserById(userId);
if (clientUser != null) {
// 踢掉用户,根据ClientType进行判断
clientUser.kickSameClientType(clientType, reason, null);
}
}
UserClient.java 文件源码
项目:QDrill
阅读 42
收藏 0
点赞 0
评论 0
@Override
protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
switch (rpcType) {
case RpcType.ACK_VALUE:
return Ack.getDefaultInstance();
case RpcType.HANDSHAKE_VALUE:
return BitToUserHandshake.getDefaultInstance();
case RpcType.QUERY_HANDLE_VALUE:
return QueryId.getDefaultInstance();
case RpcType.QUERY_RESULT_VALUE:
return QueryResult.getDefaultInstance();
case RpcType.QUERY_DATA_VALUE:
return QueryData.getDefaultInstance();
}
throw new RpcException(String.format("Unable to deal with RpcType of %d", rpcType));
}
ProtoListUtil.java 文件源码
项目:OpenYOLO-Android
阅读 33
收藏 0
点赞 0
评论 0
/**
* Reads a list of protos, using the provided parser, from the provided input stream.
* @throws IOException if the proto list could not be parsed.
*/
public static <T extends MessageLite> List<T> readMessageList(
InputStream stream,
Parser<T> parser)
throws IOException {
DataInputStream dis = new DataInputStream(stream);
int messageCount = dis.readInt();
ArrayList<T> messages = new ArrayList<>(messageCount);
for (int i = 0; i < messageCount; i++) {
messages.add(parser.parseDelimitedFrom(stream));
}
return messages;
}
ProtobufParseMap.java 文件源码
项目:sctalk
阅读 27
收藏 0
点赞 0
评论 0
/**
* Convert a protobuf to message object according to the serviceId and commandId.
*
* @param serviceId the service id
* @param commandId the command id
* @param bytes the protobuf to be parsed
* @return the parsed message object
* @throws IOException
* @since 1.0
*/
public static MessageLite getMessage(final int serviceId, final int commandId, final byte[] bytes)
throws IOException {
Map<Integer, ProtobufParseMap.Parsing> parserMap = parseServiceMap.get(serviceId);
if (parserMap == null) {
throw new IOException("UnKnown Protocol service: " + serviceId);
}
ProtobufParseMap.Parsing parser = parserMap.get(commandId);
if (parser == null) {
throw new IOException(
"UnKnown Protocol commandId: service=" + serviceId + ",command=" + commandId);
}
MessageLite msg = parser.process(bytes);
return msg;
}
RpcCompatibilityEncoder.java 文件源码
项目:dremio-oss
阅读 35
收藏 0
点赞 0
评论 0
@Override
protected void encode(ChannelHandlerContext context, OutboundRpcMessage message, List<Object> out) throws Exception {
if (message.mode != RpcMode.RESPONSE_FAILURE) {
out.add(message);
return;
}
final MessageLite pBody = message.pBody;
if (!(pBody instanceof DremioPBError)) {
out.add(message);
return;
}
DremioPBError error = (DremioPBError) pBody;
DremioPBError newError = ErrorCompatibility.convertIfNecessary(error);
out.add(new OutboundRpcMessage(message.mode, message.rpcType, message.coordinationId, newError, message.dBodies));
}
HandlerManager.java 文件源码
项目:sctalk
阅读 37
收藏 0
点赞 0
评论 0
/**
* 处理File消息
* @param ctx 信道
* @param commandId 命令
* @param header 消息头
* @param body 消息体
* @since 1.0
*/
public void doFile(ChannelHandlerContext ctx, short commandId, IMHeader header, MessageLite body) {
// 判断是否登录
if (!hasLogin(ctx)) {
return ;
}
switch (commandId) {
case FileCmdID.CID_FILE_REQUEST_VALUE:
imFileHandle.fileReq(header, body, ctx);
break;
case FileCmdID.CID_FILE_HAS_OFFLINE_REQ_VALUE:
imFileHandle.hasOfflineReq(header, body, ctx);
break;
case FileCmdID.CID_FILE_ADD_OFFLINE_REQ_VALUE:
imFileHandle.addOfflineReq(header, body, ctx);
break;
case FileCmdID.CID_FILE_DEL_OFFLINE_REQ_VALUE:
imFileHandle.delOfflineReq(header, body, ctx);
break;
default:
logger.warn("Unsupport command id {}", commandId);
break;
}
}
MyClusterMessageListener.java 文件源码
项目:sctalk
阅读 33
收藏 0
点赞 0
评论 0
/**
* @param commandId
* @param clusterMessage
* @since 1.0
*/
private void doSwitch(short commandId, MyClusterMessage clusterMessage) {
logger.debug("MyClusterMessageListener#doSwitch");
IMHeader header = clusterMessage.getHeader();
try {
MessageLite body = clusterMessage.getMessage();
switch (commandId) {
case SwitchServiceCmdID.CID_SWITCH_P2P_CMD_VALUE:// todebug
switchP2p(header, body);
default:
logger.warn("Unsupport command id {}", commandId);
break;
}
} catch (IOException e) {
logger.error("decode failed.", e);
}
}
UserClient.java 文件源码
项目:dremio-oss
阅读 54
收藏 0
点赞 0
评论 0
@Override
protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
switch (rpcType) {
case RpcType.ACK_VALUE:
return Ack.getDefaultInstance();
case RpcType.HANDSHAKE_VALUE:
return BitToUserHandshake.getDefaultInstance();
case RpcType.QUERY_HANDLE_VALUE:
return QueryId.getDefaultInstance();
case RpcType.QUERY_RESULT_VALUE:
return QueryResult.getDefaultInstance();
case RpcType.QUERY_DATA_VALUE:
return QueryData.getDefaultInstance();
case RpcType.QUERY_PLAN_FRAGMENTS_VALUE:
return QueryPlanFragments.getDefaultInstance();
case RpcType.CATALOGS_VALUE:
return GetCatalogsResp.getDefaultInstance();
case RpcType.SCHEMAS_VALUE:
return GetSchemasResp.getDefaultInstance();
case RpcType.TABLES_VALUE:
return GetTablesResp.getDefaultInstance();
case RpcType.COLUMNS_VALUE:
return GetColumnsResp.getDefaultInstance();
case RpcType.PREPARED_STATEMENT_VALUE:
return CreatePreparedStatementResp.getDefaultInstance();
case RpcType.SERVER_META_VALUE:
return GetServerMetaResp.getDefaultInstance();
}
throw new RpcException(String.format("Unable to deal with RpcType of %d", rpcType));
}
ProtoConnection.java 文件源码
项目:s-store
阅读 32
收藏 0
点赞 0
评论 0
/** Attempts to read a buffered message from the underlying connection. This is more efficient
* than attempting to actually read from the underlying connection for each message, when ends
* up making a final "empty" read from the non-blocking connection, rather than simply
* consuming all buffered data.
*
* TODO: It would be ideal if there was a way to consume data as we go, instead of buffering
* it all then consuming it. However, this only matters for streams of medium-sized messages
* with a huge backlog, which should be rare? The C++ implementation has a similar issue.
*
* @param builder message builder to be parsed
* @return true if a message was read, false if there is not enough buffered data to read a
* message.
*/
public boolean readBufferedMessage(MessageLite.Builder builder) {
try {
if (nextMessageLength == -1) {
if (connection.available() < 4) {
return false;
}
input.setLimit(4);
nextMessageLength = codedInput.readRawLittleEndian32();
}
assert nextMessageLength >= 0;
if (connection.available() < nextMessageLength) {
assert 0 <= connection.available() && connection.available() < nextMessageLength;
return false;
}
// Parse the response for the next RPC
// TODO: Add .available() to CodedInputStream to avoid many copies to internal buffer?
// or make CodedInputStream wrap a non-blocking interface like C++?
input.setLimit(nextMessageLength);
builder.mergeFrom(codedInput);
assert codedInput.isAtEnd();
codedInput.resetSizeCounter();
nextMessageLength = -1;
return true;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
LibraMessage.java 文件源码
项目:LibraSock
阅读 31
收藏 0
点赞 0
评论 0
/**
* 生成对应的消息体
*
* @param uid
* @param msg
* @return
* @throws Exception
*/
public static LibraMessage createLibraMessage(long uid, MessageLite msg) throws Exception {
Integer moduleId = handlerMgr.searchModuleIdByClass(msg.getClass().getName());
if (moduleId == null) {
LibraLog.error("protocolId is null,msg =:" + JsonUtil.ObjectToJsonString(msg));
throw new Exception("LibraEncoder.encodeHeader >>> protocolId is null");
}
LibraHead head = LibraHead.createHead(uid, moduleId);
LibraMessage message = new LibraMessage();
message.setHead(head);
message.setBody(msg);
return message;
}
ProtocolBuilder.java 文件源码
项目:dremio-oss
阅读 42
收藏 0
点赞 0
评论 0
@SuppressWarnings("unchecked")
public <REQUEST extends MessageLite, RESPONSE extends MessageLite> SendEndpointCreator<REQUEST, RESPONSE> register(
int id,
ReceiveHandler<REQUEST, RESPONSE> handler) {
Preconditions.checkArgument(id > -1 && id < 2048, "A request id must be between 0 and 2047.");
Preconditions.checkNotNull(handler);
Preconditions.checkArgument(!handlers.containsKey(id), "Only a single handler can be registered per id. You tried to register a handler for id %d twice.", id);
handlers.put(id, (ReceiveHandler<MessageLite, MessageLite>) handler);
return new EndpointCreator<REQUEST, RESPONSE>(proxyFactory, new PseudoEnum(id), (Class<RESPONSE>) handler.getDefaultResponse().getClass(), timeoutMillis);
}
ControlServer.java 文件源码
项目:QDrill
阅读 27
收藏 0
点赞 0
评论 0
@Override
protected ServerHandshakeHandler<BitControlHandshake> getHandshakeHandler(final ControlConnection connection) {
return new ServerHandshakeHandler<BitControlHandshake>(RpcType.HANDSHAKE, BitControlHandshake.PARSER) {
@Override
public MessageLite getHandshakeResponse(BitControlHandshake inbound) throws Exception {
// logger.debug("Handling handshake from other bit. {}", inbound);
if (inbound.getRpcVersion() != ControlRpcConfig.RPC_VERSION) {
throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", inbound.getRpcVersion(), ControlRpcConfig.RPC_VERSION));
}
if (!inbound.hasEndpoint() || inbound.getEndpoint().getAddress().isEmpty() || inbound.getEndpoint().getControlPort() < 1) {
throw new RpcException(String.format("RPC didn't provide valid counter endpoint information. Received %s.", inbound.getEndpoint()));
}
connection.setEndpoint(inbound.getEndpoint());
// add the
ControlConnectionManager manager = connectionRegistry.getConnectionManager(inbound.getEndpoint());
// update the close handler.
proxyCloseHandler.setHandler(manager.getCloseHandlerCreator().getHandler(connection, proxyCloseHandler.getHandler()));
// add to the connection manager.
manager.addExternalConnection(connection);
return BitControlHandshake.newBuilder().setRpcVersion(ControlRpcConfig.RPC_VERSION).build();
}
};
}
TestFabric.java 文件源码
项目:dremio-oss
阅读 38
收藏 0
点赞 0
评论 0
@Override
public MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
switch(rpcType){
case 1:
case 2:
return NodeEndpoint.getDefaultInstance();
default:
throw new UnsupportedOperationException();
}
}
OutboundRpcMessage.java 文件源码
项目:QDrill
阅读 33
收藏 0
点赞 0
评论 0
OutboundRpcMessage(RpcMode mode, int rpcTypeNumber, int coordinationId, MessageLite pBody, ByteBuf... dBodies) {
super(mode, rpcTypeNumber, coordinationId);
this.pBody = pBody;
// Netty doesn't traditionally release the reference on an unreadable buffer. However, we need to so that if we send a empty or unwritable buffer, we still release. otherwise we get weird memory leaks when sending empty vectors.
List<ByteBuf> bufs = Lists.newArrayList();
for (ByteBuf d : dBodies) {
if (d.readableBytes() == 0) {
d.release();
} else {
bufs.add(d);
}
}
this.dBodies = bufs.toArray(new ByteBuf[bufs.size()]);
}
DataDefaultInstanceHandler.java 文件源码
项目:QDrill
阅读 40
收藏 0
点赞 0
评论 0
public static MessageLite getResponseDefaultInstanceClient(int rpcType) throws RpcException {
switch (rpcType) {
case RpcType.ACK_VALUE:
return Ack.getDefaultInstance();
case RpcType.HANDSHAKE_VALUE:
return BitServerHandshake.getDefaultInstance();
default:
throw new UnsupportedOperationException();
}
}
DataDefaultInstanceHandler.java 文件源码
项目:QDrill
阅读 39
收藏 0
点赞 0
评论 0
public static MessageLite getResponseDefaultInstanceServer(int rpcType) throws RpcException {
switch (rpcType) {
case RpcType.ACK_VALUE:
return Ack.getDefaultInstance();
case RpcType.HANDSHAKE_VALUE:
return BitClientHandshake.getDefaultInstance();
case RpcType.REQ_RECORD_BATCH_VALUE:
return FragmentRecordBatch.getDefaultInstance();
default:
throw new UnsupportedOperationException();
}
}
UserServer.java 文件源码
项目:QDrill
阅读 41
收藏 0
点赞 0
评论 0
@Override
protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
// a user server only expects acknowledgments on messages it creates.
switch (rpcType) {
case RpcType.ACK_VALUE:
return Ack.getDefaultInstance();
default:
throw new UnsupportedOperationException();
}
}
ReconnectingConnection.java 文件源码
项目:dremio-oss
阅读 29
收藏 0
点赞 0
评论 0
public <R extends MessageLite, C extends RpcCommand<R, CONNECTION_TYPE>> void runCommand(C cmd) {
// logger.info(String.format("Running command %s sending to host %s:%d", cmd, host, port));
if (closed.get()) {
cmd.connectionFailed(FailureType.CONNECTION, new IOException("Connection has been closed"));
}
CONNECTION_TYPE connection = connectionHolder.get();
if (connection != null) {
if (connection.isActive()) {
cmd.connectionAvailable(connection);
// logger.info("Connection available and active, command run inline.");
return;
} else {
// remove the old connection. (don't worry if we fail since someone else should have done it.
connectionHolder.compareAndSet(connection, null);
}
}
/**
* We've arrived here without a connection, let's make sure only one of us makes a connection. (fyi, another
* endpoint could create a reverse connection
**/
synchronized (this) {
connection = connectionHolder.get();
if (connection != null) {
cmd.connectionAvailable(connection);
} else {
logger.info("[{}]: No connection active, opening new connection to {}:{}.", name, host, port);
BasicClient<?, CONNECTION_TYPE, OUTBOUND_HANDSHAKE, ?> client = getNewClient();
ConnectionListeningFuture<R, C> future = new ConnectionListeningFuture<R, C>(cmd);
client.connectAsClient(future, handshake, host, port);
// logger.info("Connection available and active, command now being run inline.");
future.waitAndRun();
// logger.info("Connection available. Command now run.");
}
return;
}
}
BroadcastQueryClient.java 文件源码
项目:OpenYOLO-Android
阅读 37
收藏 0
点赞 0
评论 0
/**
* Dispatches a query for the specified data type, carrying the specified protocol buffer
* message (if required). The response to this query will be provided to the specified callback.
* A {@link #DEFAULT_TIMEOUT_MS default timeout} will be used.
*/
public void queryFor(
@NonNull String dataType,
@Nullable MessageLite queryMessage,
@NonNull QueryCallback callback) {
queryFor(dataType,
queryMessage,
DEFAULT_TIMEOUT_MS,
callback);
}
BroadcastQueryClient.java 文件源码
项目:OpenYOLO-Android
阅读 37
收藏 0
点赞 0
评论 0
/**
* Dispatches a query for the specified data type, carrying the specified protocol buffer
* message (if required). The response to this query will be provided to the specified callback.
*/
public void queryFor(
@NonNull String dataType,
@Nullable MessageLite queryMessage,
long timeoutInMs,
@NonNull QueryCallback callback) {
queryFor(dataType,
queryMessage != null ? queryMessage.toByteArray() : null,
timeoutInMs,
callback);
}
ProtoListUtil.java 文件源码
项目:OpenYOLO-Android
阅读 36
收藏 0
点赞 0
评论 0
/**
* Creates a {@link ByteString} by serializing the list of protos. Use
* {@link #readMessageList(ByteString, Parser)} to deserialize.
*/
public static <T extends MessageLite> ByteString writeMessageList(List<T> protos) {
Output output = ByteString.newOutput();
try {
writeMessageListTo(output, protos);
} catch (IOException ex) {
throw new IllegalStateException("Unable to write protobufs to memory");
}
return output.toByteString();
}
ProtoListUtil.java 文件源码
项目:OpenYOLO-Android
阅读 39
收藏 0
点赞 0
评论 0
/**
* Writes the provided list of protos to the provided output stream.
* @throws IOException if the protos cannot be written to the provided output stream.
*/
public static <T extends MessageLite> void writeMessageListTo(
OutputStream stream,
List<T> protos)
throws IOException {
DataOutputStream dos = new DataOutputStream(stream);
dos.writeInt(protos.size());
for (MessageLite proto : protos) {
proto.writeDelimitedTo(stream);
}
}
IMSwitchHandlerImpl.java 文件源码
项目:sctalk
阅读 30
收藏 0
点赞 0
评论 0
@Override
public void switchP2p(IMHeader header, MessageLite body, ChannelHandlerContext ctx) {
// IMSwitchService.IMP2PCmdMsg p2pCmdMsg = (IMSwitchService.IMP2PCmdMsg) body;
// long toId = p2pCmdMsg.getToUserId();
// FIXME 需要确认一下逻辑
// ClientConnection clientConn = ClientConnectionMap.getClientByUserId(String.valueOf(toId));
// if (clientConn != null) {
// clientConn.getCtx().writeAndFlush(new IMProtoMessage<IMSwitchService.IMP2PCmdMsg>(header.clone(), p2pCmdMsg));
// }
IMSwitchService.IMP2PCmdMsg p2pCmdMsg = (IMSwitchService.IMP2PCmdMsg)body;
// 设置用户的ID
long userId = super.getUserId(ctx);
p2pCmdMsg = p2pCmdMsg.toBuilder().setFromUserId(userId).build();
IMProtoMessage<MessageLite> swithP2pMsg = new IMProtoMessage<MessageLite>(header, body);
long toId = p2pCmdMsg.getToUserId();
long fromId = p2pCmdMsg.getFromUserId();
ClientUser toClientUser = ClientUserManager.getUserById(toId);
ClientUser fromClientUser = ClientUserManager.getUserById(fromId);
//处理是否正确需要确认?
if (toClientUser != null ){
toClientUser.broadcast(swithP2pMsg, ctx);
}
if (fromClientUser != null) {
fromClientUser.broadcast(swithP2pMsg, null);
}
ClientUserManager.broadCast(swithP2pMsg, SysConstant.CLIENT_TYPE_FLAG_BOTH);
// 通过路由进行转发
// routerHandler.send(header, body);
messageServerCluster.send(header, body);
}
PacketEncoder.java 文件源码
项目:sctalk
阅读 32
收藏 0
点赞 0
评论 0
@Override
protected void encode(final ChannelHandlerContext ctx, final IMProtoMessage<MessageLite> protoMessage, final ByteBuf out) throws Exception {
try {
logger.debug("Protobuf encode started.");
// [HEADER] data
IMHeader header = protoMessage.getHeader();
byte[] bytes = protoMessage.getBody().toByteArray();
int length = bytes.length;
// Set the length of bytebuf
header.setLength(SysConstant.PROTOCOL_HEADER_LENGTH + length);
byte[] allbytes = header.encode().array();
allbytes = Arrays.copyOf(allbytes, SysConstant.PROTOCOL_HEADER_LENGTH + length);
for (int i = 0; i < length; i++) {
allbytes[i + 16] = bytes[i];
}
out.writeBytes(allbytes);
logger.debug("Sent protobuf: commandId={}", header.getCommandId());
} catch (Exception e) {
logger.error("编码异常", e);
} finally {
logger.debug("Protobuf encode finished.");
}
}
ClientUser.java 文件源码
项目:sctalk
阅读 36
收藏 0
点赞 0
评论 0
public void broadcast(IMProtoMessage<MessageLite> message, ChannelHandlerContext fromCtx) {
for (ChannelHandlerContext conn: connMap.values()) {
if (conn != fromCtx) {
logger.debug("发送消息> {}", conn.channel().remoteAddress());
conn.writeAndFlush(message);
// conn > AddToSendList
}
}
}