通过fescar项目官方的这张图可以知道,RM及RM与TC之间需要进行远程通讯。
而通过查看工程依赖及源码能了解到其远程通讯主要是依赖于netty的技术组件包来实现的RPC。具体的设计模型如下所示:
ChannelDuplexHandler
从下图中可以了解到fescar对netty最主要的API依赖就是io.netty.channel.ChannelDuplexHandler
。
它实现了基于Socket进行网络传输的基本操作,如:
-
bind
端口绑定。 -
connect/disconnect
创建连接/关闭连接。 -
read
从端口连接中接收数据。 -
write
从端口连接中写入数据。
等等......
AbstractRpcRemoting
AbstractRpcRemoting继承于ChannelDuplexHandler,主要是针对于fescar的业务场景再进行一次封装。如:
-
channelRead
过滤消息,并使用线程池异步对消息做业务处理。 - 定义抽象方法
dispatch
,将消息的业务处理做进一步封装。(因为前端与后端对消息处理的逻辑不一样,交由子类来实现。) -
sendAsycRequest
添加多线程异步请求及响应处理。 - 对远程调用的数据结构进行封装,所有消息都需要实现
MessageCodec
接口。
public interface MessageCodec {
short getTypeCode();
byte[] encode();
boolean decode(ByteBuf in);
}
- 等等......
AbstractRpcRemotingClient
AbstractRpcRemotingClient继承于AbstractRpcRemoting,主要是针对fescar的业务场景对客户端的RPC调用做进一步的业务封装。如:
- 实现
RemotingService
接口,实现RPC服务开启及关闭的逻辑 。
public interface RemotingService {
void start();
void shutdown();
}
- 实现
ClientMessageSender
接口,实现客户端RPC消息发送的功能。
public interface ClientMessageSender {
Object sendMsgWithResponse(Object msg, long timeout) throws TimeoutException;
Object sendMsgWithResponse(String serverAddress, Object msg, long timeout) throws TimeoutException;
Object sendMsgWithResponse(Object msg) throws TimeoutException;
void sendResponse(long msgId, String serverAddress, Object msg);
}
-
channelRead
: 基于客户端消息处理逻辑做业务封装。 -
dispatch
:针对客户端实现消息处理逻辑,(基于ClientMessageListener
接口的实现来处理)。
if (clientMessageListener != null) {
String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());
clientMessageListener.onMessage(msgId, remoteAddress, msg, this);
}
-
init
:初始化netty连接配置 - 等等......
TmRpcClient
TmRpcClient继承于AbstractRpcRemotingClient,针对事务管理器
的RPC调用实现了以下功能:
- 实现了
RegisterMsgListener
接口,对客户端注册到服务端的消息做监听处理。
public interface RegisterMsgListener {
void onRegisterMsgSuccess(String serverAddress, Channel channel, Object response, AbstractMessage requestMessage);
void onRegisterMsgFail(String serverAddress, Channel channel, Object response, AbstractMessage requestMessage);
}
- 重载父类对
ClientMessageSender
接口实现,实现RPC客户端发送消息的功能。 - 重载父类
init
方法:基于配置创建线程池并使用NettyAPI创建RPC连接池。 - 等等......
RmRpcClient
RmRpcClient与TmRpcClient都继承于AbstractRpcRemotingClient,基本要实现的功能与其类似,只是针对资源管理器
逻辑有差异,这里不再赘述。
AbstractRpcRemotingServer
AbstractRpcRemotingServer继承于AbstractRpcRemoting,主要是针对fescar的业务场景对服务端的RPC调用做进一步的业务封装。如:
- 实现
RemotingService
接口,实现RPC服务开启及关闭的逻辑 。 - 实现
RemotingServer
接口,该接口暂无需要的实现方法。 - 等等......
RpcServer
RpcServer继承于AbstractRpcRemotingServer,针对事务协调管理器
的RPC调用实现了以下功能:
- 实现
ServerMessageSender
接口,实现服务端RPC消息发送的功能。
public interface ServerMessageSender {
void sendResponse(long msgId, Channel channel, Object msg);
Object sendSyncRequest(String resourceId, String clientId, Object message, long timeout) throws IOException, TimeoutException;
Object sendSyncRequest(String resourceId, String clientId, Object message) throws IOException, TimeoutException;
}
-
channelRead
: 基于服务端消息处理逻辑做业务封装。 -
dispatch
:针对服务端实现消息处理逻辑(基于ServerMessageListener
接口的实现来处理)。
public void dispatch(long msgId, ChannelHandlerContext ctx, Object msg) {
if (msg instanceof RegisterRMRequest) {
serverMessageListener.onRegRmMessage(msgId, ctx, (RegisterRMRequest)msg, this,
checkAuthHandler);
} else {
if (ChannelManager.isRegistered(ctx.channel())) {
serverMessageListener.onTrxMessage(msgId, ctx, msg, this);
} else {
try {
closeChannelHandlerContext(ctx);
......
-
init
:初始化netty连接配置 - 等等......