为了更好理解分布式事务,首先提出一个问题:
假设数据库中有两个表ta,tb,我们要分别更改ta表中的ra记录和tb表中的rb记录,但要求ra和rb记录都修改成功,才认为此次操作时成功,或者需要失败回滚。针对这种情况处理方式很简单,只需要使用个事务就好了。
但假如ta和tb不在一个数据库中或者不在一个数据库实例上,此时数据库的事务这两个表也是无法同时管理的,针对这种情况要如何解决了?如何保证对ta和tb操作的一致性?
此时可以通过TCC来解决上述问题,TCC通过实现两阶段协议,将服务流程抽象为Try-Confirm-Cancel 三个操作:
第一阶段:try,主要用于对资源的预留
第二阶段:comfirm/cancel,comfirm用于对预留资源的使用,对业务进行提交,cancel是对预留资源的释放,对业务进行回滚操作
下面从三个方面介绍TCC
- seata中TCC的源码实现
- 写好TCC实现的注意点
- seata中TCC模式如何做到高可用的
1. seata中TCC的实现
seata主要由三个模块组成
- TC (Transaction Coordinator) - 事务协调者维护全局和分支事务的状态,驱动全局事务提交或回滚。
- TM (Transaction Manager) - 事务管理器定义全局事务的范围:开始全局事务、提交或回滚全局事务。
- RM (Resource Manager) - 资源管理器管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
首先我们看下使用方式,参考源码:https://github.com/seata/seata-samples/tree/master/tcc/dubbo-tcc-sample
服务提供方提供两个服务,TccActionOne,TccActionTwo
public interface TccActionOne {
@TwoPhaseBusinessAction(name = "DubboTccActionOne" , commitMethod = "commit", rollbackMethod = "rollback")
public boolean prepare(BusinessActionContext actionContext, int a);
public boolean commit(BusinessActionContext actionContext);
public boolean rollback(BusinessActionContext actionContext);
}
public interface TccActionTwo {
@TwoPhaseBusinessAction(name = "DubboTccActionTwo" , commitMethod = "commit", rollbackMethod = "rollback")
public boolean prepare(BusinessActionContext actionContext, String b);
public boolean commit(BusinessActionContext actionContext);
public boolean rollback(BusinessActionContext actionContext);
}
同时在provider和consumer端都要引入具体的GlobalTransactionScanner,该类会对TM和RM进行初始化和注册:
<bean class="io.seata.spring.annotation.GlobalTransactionScanner">
<constructor-arg value="tcc-sample"/>
<constructor-arg value="my_test_tx_group"/>
</bean>
具体调用如图:
以上是简单的使用方式,有了上面基本使用流程的介绍后,现在开始分析下具体的代码实现,首先重点关注下以下的类和注解:
- GlobalTransactionScanner,用于扫描是否开启了分布式事务,并对加了分布式事务注解的方法注入代理,如TwoPhaseBusinessAction和GlobalTransactional
- 注解TwoPhaseBusinessAction,表示该方法使用的TCC模式,并同时制定commit和cancel方法
- 注解GlobalTransactional,用于表示被修饰的方法会开启分布式事务来进行处理
GlobalTransactionScanner通过AbstractAutoProxyCreator类,来为被分布式相关注解修饰的方法添加动态代理,所以在服务启动时,会执行GlobalTransactionScanner类中相关方法,
主要涉及的方法有:
- GlobalTransactionScanner#initClient,初始化TM和RM客户端
- GlobalTransactionScanner#wrapIfNecessary,为添加了TwoPhaseBusinessAction和GlobalTransactional注解的方法添加代理,同时分别为修饰的方法注入TccActionInterceptor和GlobalTransactionalInterceptor代理类,同时会将本地服务作为RM客户端注册到TC服务端中
1.1 客户端初始化
所以GlobalTransactionScanner是Seata客户端的启动类,首先看下TM和RM客户端的初始化
TM和RM会分别初始化TmNettyRemotingClient和RmNettyRemotingClient,这个两个类的父类都是AbstractNettyRemotingClient,在该类的init方法中,会启动一个定时来检查TC的channel是否存活,同时会发送注册信息到TC中,最后会启动netty客户端
public void init() {
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
//检测连接TC的channel是否存活,若不存在对应channel或者channel已关闭,则会重新连接到TC,同时发送注册信息到TC服务中
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
if (NettyClientConfig.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
super.init();
clientBootstrap.start();
}
在NettyClientChannelManager#reconnect方法中,会通过获取所有注册到注册中心的TC服务地址,然后判断当前缓存NettyClientChannelManager#channels中是否存在对应地址且存活状态的channel,若不存在,则会为该TC地址创建channel,同时向改地址发送注册信息,TmNettyRemotingClient和RmNettyRemotingClient注册信息分别为RegisterTMRequest和RegisterRMRequest,主要方法步骤是在netty.NettyClientChannelManager#doConnect中创建channel,然后在NettyPoolableFactory#makeObject方法中发送对应的注册消息
而在GlobalTransactionScanner#wrapIfNecessary方法中,会为TwoPhaseBusinessAction和GlobalTransactional修饰的方法添加代理实现
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
try {
synchronized (PROXYED_SET) {
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
//check TCC proxy
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)interceptor);
} else {
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}
if (interceptor == null) {
if (globalTransactionalInterceptor == null) {
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
}
LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
if (!AopUtils.isAopProxy(bean)) {
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}
方法中会为TwoPhaseBusinessAction注解修饰的方法生成TccActionInterceptor代理,为GlobalTransactional生成GlobalTransactionalInterceptor代理。
但在TCCBeanParserUtils#isTccAutoProxy方法中若存在TwoPhaseBusinessAction注解,会通过RmNettyRemotingClient#registerResource发送注册信息,具体方法在DefaultRemotingParser#parserRemotingServiceInfo中,个人觉得这个步骤可以去掉了有点冗余
1.2 服务端初始化
TC服务端启动类io.seata.server.Server#main,该方法中会初始化DefaultCoordinator类,这个类是所有消息的处理类,DefaultCoordinator主要属性如下
//各种定时任务,用来重试
private ScheduledThreadPoolExecutor retryRollbacking = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("RetryRollbacking", 1));
private ScheduledThreadPoolExecutor retryCommitting = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("RetryCommitting", 1));
private ScheduledThreadPoolExecutor asyncCommitting = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("AsyncCommitting", 1));
private ScheduledThreadPoolExecutor timeoutCheck = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("TxTimeoutCheck", 1));
private ScheduledThreadPoolExecutor undoLogDelete = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("UndoLogDelete", 1));
private RemotingServer remotingServer; //消息通信服务端
private DefaultCore core; //主要的事务处理
针对消息的处理流程,具体方法在NettyRemotingServer#registerProcessor:
private void registerProcessor() {
// 1. registry on request message processor
ServerOnRequestProcessor onRequestProcessor =
new ServerOnRequestProcessor(this, getHandler());
//处理事务提交回滚等消息
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
// 2. registry on response message processor,对分支事务提交和回滚响应结果的处理
ServerOnResponseProcessor onResponseProcessor =
new ServerOnResponseProcessor(getHandler(), getFutures());
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor);
// 3. registry rm message processor,处理RM客户端的注册消息
RegRmProcessor regRmProcessor = new RegRmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
// 4. registry tm message processor,处理TM客户端的注册消息
RegTmProcessor regTmProcessor = new RegTmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
// 5. registry heartbeat message processor,处理客户端的心跳消息
ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}
从上图可以看出注册消息是在RegRmProcessor和RegTmProcessor中进行处理。
1.3 TCC消息处理分析
首先还是看下一开始的示例代码:
@GlobalTransactional
public String doTransactionCommit(){
//第一个TCC 事务参与者
boolean result = tccActionOne.prepare(null, 1);
if(!result){
throw new RuntimeException("TccActionOne failed.");
}
List list = new ArrayList();
list.add("c1");
list.add("c2");
result = tccActionTwo.prepare(null, "two", list);
if(!result){
throw new RuntimeException("TccActionTwo failed.");
}
return RootContext.getXID();
}
在调用doTransactionCommit方法时,会进入到代理类GlobalTransactionalInterceptor中,最终会执行到TransactionalTemplate#execute方法,该方法的主要逻辑如下:
beginTransaction(txInfo, tx);//开始事务
rs = business.execute(); //执行业务代码,即执行doTransactionCommit方法
commitTransaction(tx); //提交事务
-
在开始事务beginTransaction方法中,会向TC服务发送GlobalBeginRequest消息,来获取事务xid,该消息最终会在服务端DefaultCore#begin方法中得到处理:
- 通过雪花算法产生一个随机数作为transactionId.根据transactionId生成xid,具体规则是ipAddress + ":" + port + ,":" + transactionId,ipAddress为本机ip,port为当前服务的端口
- 将全局事务记录写入global_table表中,同时返回xid,表中xid为主键,transactionId为索引
执行业务代码,业务代码会调用远端服务,如tccActionOne.prepare方法,由于该方法被TwoPhaseBusinessAction修饰,会执行代理类TccActionInterceptor,在TccActionInterceptor类中的invoke方法主要逻辑如下:
String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext); //注册分支事务,同时获取分支事务id
.....
ret.put(Constants.TCC_METHOD_RESULT, targetCallback.execute()); #执行实际的分支事务的业务代码,如tccActionOne.prepare方法
其中在doTccActionLogStore方法中客户端通过发送BranchRegisterRequest消息,其中clientId和lockKeys都是null,resourceId为TwoPhaseBusinessAction中name名称该消息最终会在服务端AbstractCore#branchRegister:
通过随机算法生成branchId
生成一个分支记录,将记录插入到branch_table表中,其中branchId为主键
- doTransactionCommit所有业务逻辑执行成功后,GlobalTransactionalInterceptor代理类会执行到commitTransaction方法,进行全局事务的提交,客户端会通过DefaultTransactionManager#commit方法发送GlobalCommitRequest事务提交消息,此时服务端接收到该消息后,会通过DefaultCore#doGlobalCommit方法进行全局事务的提交,该方法的主要逻辑如下:
- 从存储中获取全局事务xid下所有分支事务记录,为每个分支事务调用AbstractCore#branchCommitSend方法,发送BranchCommitRequest消息到对应的分支事务客户端,来进行分支事务的commit,客户端接收到消息后,会执行TwoPhaseBusinessAction注解中填写的commit方法来完成分支事务的提交
- 当有任何一个分支服务调用失败时,如tccActionOne.prepare调用失败,会回滚全局事务,然后TC服务端会回滚所有的分支事务
2. 写好TCC实现的注意点
写好一个完备的TCC的实现是有一定的要求,需要解决空回滚,幂等操作和悬挂问题。
-
空回滚
即全局事务回滚时,有可能分支事务try接口由于网络问题并没有被触发或者还在处于try阶段,此时TC已经触发了分支事务的cancel,此时需要分支事务服务需要返回成功,不然会有重试,即分支事务要支持空回滚 -
幂登性
由于网络抖动问题,分支事务中的try方法可能会被执行多次,所以要保证资源不会被重复消耗,解决办法可以通过为每一个请求维护一个唯一id,如分支事务id,来过滤重复的请求 -
悬挂问题
当全局事务回滚时,由于分支事务try方法执行了较长时间,导致分支事务执行cancel方法后,try方法才执行成功,这样导致被try锁定的资源得不到释放,解决办法是将每个分支事务的请求记录下来,所以当执行try方法后,发现已经存在cancel的执行记录后,则回滚当前的try操作
3. seata中TCC模式如何做到高可用的
要做到高可用,要做到服务的无状态,为了做到这点seata做了如下工作:
- 存储,TC中事务数据的存储避免使用本地存储,可以使用mysql等
-
服务发现与注册,
从上文实现分析中,我们可以看出TC服务会将本服务的ip注册到注册中心,如zk,etcd等,TM和RM客户端会拉取所有TC服务端的地址,同时将客户端服务的ip注册到所有TC服务中,这样保证了每个TC服务都有所有客户端的链接信息