TM、RM和TC之间如何通信。(源码持续更新,本文仅供参考)
Fescar处理分布式事务,本身也是分布式系统。其中TM和RM可理解为客户端Client,TC事务协调器作为服务端Server。
Fescar中两者之间的通信,毫不意外是通过Netty实现的。从以理解Fescar作为目标这一角度来说,单纯的网络通信的模块,其实没有必要去刨根究底,关注着一细节。
但是目前Fescar的实现中,请求、响应、编码解码、消息处理等等联系的比较紧密(耦合了。。),如果想更好的跟踪、理解Fescar是如何处理不同类型的事务消息的,还是有必要把Fescar的rpc实现梳理一下。
消息
Fescar为每一种交互都定义了一个消息类(request和response为对应一组),为了容易理解,此图只包含Request,,从上图可以看出,
消息主要分三大类
- AbstractTransactionRequestToTC
RM/TM发送到TC的消息 - AbstractTransactionRequestToRM
TC发送到TM的消息 - AbstractIdentifyRequest
TM/RM和TC连接时的注册消息
另外一个MergedWarpMessage
是一个消息包装类,内部包含多个消息,用于消息的异步批量发送。
RPC核心类
核心类图如下:
主要实现类:
- TC实现
RpcServer extends AbstractRpcRemotingServer
- TM实现
TmRpcClient extends AbstractRpcRemotingClient
- RM实现
RMRpcClient extends AbstractRpcRemotingClient
他们拥有共同父类AbstractRpcRemoting
。
AbstractRpcRemoting提供了基本的消息发送和消息处理实现。
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof RpcMessage) {
final RpcMessage rpcMessage = (RpcMessage)msg;
if (rpcMessage.isRequest()) {
try {
AbstractRpcRemoting.this.messageExecutor.execute(new Runnable() {
@Override
public void run() {
try {
dispatch(rpcMessage.getId(), ctx, rpcMessage.getBody());
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.errCode, th.getMessage(), th);
}
}
});
} catch (RejectedExecutionException e) {
//...
}
} else {
MessageFuture messageFuture = futures.remove(rpcMessage.getId());
if (messageFuture != null) {
messageFuture.setResultMessage(rpcMessage.getBody());
} else {
try {
AbstractRpcRemoting.this.messageExecutor.execute(new Runnable() {
@Override
public void run() {
try {
dispatch(rpcMessage.getId(), ctx, rpcMessage.getBody());
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.errCode, th.getMessage(), th);
}
}
});
} catch (RejectedExecutionException e) {
//...
}
}
}
}
}
public abstract void dispatch(long msgId, ChannelHandlerContext ctx, Object msg);
可以看到RpcMessage
就是Fescar系统间交互的信息载体,所有的消息都是在此进行处理(RpcServer
中Overide
了此方法,对一些特殊消息进行了提前处理,但最终还是调用此处的方法)。
所有的消息都是异步消费,交由dispatch
方法处理。
TC处理消息
RpcServer
的dispatch实现,所有消息都委托给ServerMessageListener
处理。
// RpcServer
@Override
public void dispatch(long msgId, ChannelHandlerContext ctx, Object msg) {
if (msg instanceof RegisterRMRequest) {
serverMessageListener.onRegRmMessage(msgId, ctx, (RegisterRMRequest)msg, this,
checkAuthHandler);
} else {
if (ChannelManager.isRegistered(ctx.channel())) {
serverMessageListener.onTrxMessage(msgId, ctx, msg, this);
} else {
closeChannelHandlerContext(ctx);
}
}
}
- 注册类消息
对注册TM和注册RM的消息的处理主要是将channel和请求绑定(IP,PORT,ResourceId等等),绑定关系使用ChannelManager
和RpcContext
管理。最终ChannelManager
将TM和RM的Channel缓存在两个很复杂的map中。
具体绑定过程比较复杂(有点乱。。)先忽略
// resourceId -> applicationId -> ip -> port -> RpcContext
private static final ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>>>>
RM_CHANNELS = new ConcurrentHashMap<>();
//ip+appname,port
private static final ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> TM_CHANNELS = new ConcurrentHashMap<>();
- 事务消息
@Override
public void onTrxMessage(long msgId, ChannelHandlerContext ctx, Object message, ServerMessageSender sender) {
RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
if (message instanceof MergedWarpMessage) {
AbstractResultMessage[] results = new AbstractResultMessage[((MergedWarpMessage)message).msgs.size()];
for (int i = 0; i < results.length; i++) {
final AbstractMessage subMessage = ((MergedWarpMessage)message).msgs.get(i);
results[i] = transactionMessageHandler.onRequest(subMessage, rpcContext);
}
MergeResultMessage resultMessage = new MergeResultMessage();
resultMessage.setMsgs(results);
sender.sendResponse(msgId, ctx.channel(), resultMessage);
} else if (message instanceof AbstractResultMessage) {
transactionMessageHandler.onResponse((AbstractResultMessage)message, rpcContext);
}
}
层层委托后,所有消息最终委托给DefaultCoordinator
和DefaultCore
处理,具体逻辑不在此文分析。
TM处理消息
准确的说TM主要是发送消息到TC(register、commit和rollback等),对于入站消息的处理没有特殊实现。
RM处理消息
RM除了需要发送消息到TC外,需要处理TC的branchCommit和branchRollback消息,参见RmMessageListener
@Override
public void onMessage(long msgId, String serverAddress, Object msg, ClientMessageSender sender) {
if (msg instanceof BranchCommitRequest) {
handleBranchCommit(msgId, serverAddress, (BranchCommitRequest)msg, sender);
} else if (msg instanceof BranchRollbackRequest) {
handleBranchRollback(msgId, serverAddress, (BranchRollbackRequest)msg, sender);
}
}
合并消息
发送消息时,为了提高吞吐量,Fescar通过MergedSendRunnable
来合并消息,批量异步发送;合并后的Request消息为MergedWarpMessage
启动Fescar
最后提一下Fescar的启动方式。
public interface RemotingService {
void start();
void shutdown();
}
RemotingService
定义了TM、RM、TC的启动和关闭;不过实际的实现中,Fescar是通过AbstractRpcRemoting
的init()方法去完成服务的初始化,各种任务线程池的启动,以及服务长连接的建立。
以RmCpcCLient
为例:
//RmRpcClient
@Override
public void init() {
if (initialized.compareAndSet(false, true)) {
super.init();
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
reconnect(); //读取配置,获取TC地址,连接TC
}
}, SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.SECONDS);
ExecutorService mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory(getThreadPrefix(MERGE_THREAD_PREFIX), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable()); //启动消息合并发送的任务
}
}
OK,对Fescar的RPC和消息模块大致有了总体认识了,后面再去看看Fescar是具体是怎么流程化去处理各种事务消息的。