分布式事务Seata-TCC源码分析

为了更好理解分布式事务,首先提出一个问题:

假设数据库中有两个表ta,tb,我们要分别更改ta表中的ra记录和tb表中的rb记录,但要求ra和rb记录都修改成功,才认为此次操作时成功,或者需要失败回滚。针对这种情况处理方式很简单,只需要使用个事务就好了。
但假如ta和tb不在一个数据库中或者不在一个数据库实例上,此时数据库的事务这两个表也是无法同时管理的,针对这种情况要如何解决了?如何保证对ta和tb操作的一致性?

此时可以通过TCC来解决上述问题,TCC通过实现两阶段协议,将服务流程抽象为Try-Confirm-Cancel 三个操作:
第一阶段:try,主要用于对资源的预留
第二阶段:comfirm/cancel,comfirm用于对预留资源的使用,对业务进行提交,cancel是对预留资源的释放,对业务进行回滚操作

下面从三个方面介绍TCC

  1. seata中TCC的源码实现
  2. 写好TCC实现的注意点
  3. seata中TCC模式如何做到高可用的

1. seata中TCC的实现

seata主要由三个模块组成

  1. TC (Transaction Coordinator) - 事务协调者维护全局和分支事务的状态,驱动全局事务提交或回滚。
  2. TM (Transaction Manager) - 事务管理器定义全局事务的范围:开始全局事务、提交或回滚全局事务。
  3. RM (Resource Manager) - 资源管理器管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

首先我们看下使用方式,参考源码:https://github.com/seata/seata-samples/tree/master/tcc/dubbo-tcc-sample
服务提供方提供两个服务,TccActionOne,TccActionTwo

public interface TccActionOne {
    @TwoPhaseBusinessAction(name = "DubboTccActionOne" , commitMethod = "commit", rollbackMethod = "rollback")
    public boolean prepare(BusinessActionContext actionContext, int a);
    public boolean commit(BusinessActionContext actionContext);
    public boolean rollback(BusinessActionContext actionContext);
}

public interface TccActionTwo {
    @TwoPhaseBusinessAction(name = "DubboTccActionTwo" , commitMethod = "commit", rollbackMethod = "rollback")
    public boolean prepare(BusinessActionContext actionContext, String b);
    public boolean commit(BusinessActionContext actionContext);
    public boolean rollback(BusinessActionContext actionContext);

}

同时在provider和consumer端都要引入具体的GlobalTransactionScanner,该类会对TM和RM进行初始化和注册:

<bean class="io.seata.spring.annotation.GlobalTransactionScanner">
    <constructor-arg value="tcc-sample"/>
    <constructor-arg value="my_test_tx_group"/>
</bean>

具体调用如图:


image.jpeg

以上是简单的使用方式,有了上面基本使用流程的介绍后,现在开始分析下具体的代码实现,首先重点关注下以下的类和注解:

  1. GlobalTransactionScanner,用于扫描是否开启了分布式事务,并对加了分布式事务注解的方法注入代理,如TwoPhaseBusinessAction和GlobalTransactional
  2. 注解TwoPhaseBusinessAction,表示该方法使用的TCC模式,并同时制定commit和cancel方法
  3. 注解GlobalTransactional,用于表示被修饰的方法会开启分布式事务来进行处理

GlobalTransactionScanner通过AbstractAutoProxyCreator类,来为被分布式相关注解修饰的方法添加动态代理,所以在服务启动时,会执行GlobalTransactionScanner类中相关方法,
主要涉及的方法有:

  1. GlobalTransactionScanner#initClient,初始化TM和RM客户端
  2. GlobalTransactionScanner#wrapIfNecessary,为添加了TwoPhaseBusinessAction和GlobalTransactional注解的方法添加代理,同时分别为修饰的方法注入TccActionInterceptor和GlobalTransactionalInterceptor代理类,同时会将本地服务作为RM客户端注册到TC服务端中

1.1 客户端初始化

所以GlobalTransactionScanner是Seata客户端的启动类,首先看下TM和RM客户端的初始化
TM和RM会分别初始化TmNettyRemotingClient和RmNettyRemotingClient,这个两个类的父类都是AbstractNettyRemotingClient,在该类的init方法中,会启动一个定时来检查TC的channel是否存活,同时会发送注册信息到TC中,最后会启动netty客户端

public void init() {
    timerExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            //检测连接TC的channel是否存活,若不存在对应channel或者channel已关闭,则会重新连接到TC,同时发送注册信息到TC服务中
            clientChannelManager.reconnect(getTransactionServiceGroup());
        }
    }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
    if (NettyClientConfig.isEnableClientBatchSendRequest()) {
        mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
            MAX_MERGE_SEND_THREAD,
            KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(),
            new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
        mergeSendExecutorService.submit(new MergedSendRunnable());
    }
    super.init();
    clientBootstrap.start();
}

在NettyClientChannelManager#reconnect方法中,会通过获取所有注册到注册中心的TC服务地址,然后判断当前缓存NettyClientChannelManager#channels中是否存在对应地址且存活状态的channel,若不存在,则会为该TC地址创建channel,同时向改地址发送注册信息,TmNettyRemotingClient和RmNettyRemotingClient注册信息分别为RegisterTMRequest和RegisterRMRequest,主要方法步骤是在netty.NettyClientChannelManager#doConnect中创建channel,然后在NettyPoolableFactory#makeObject方法中发送对应的注册消息

而在GlobalTransactionScanner#wrapIfNecessary方法中,会为TwoPhaseBusinessAction和GlobalTransactional修饰的方法添加代理实现

@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
    try {
        synchronized (PROXYED_SET) {
            if (PROXYED_SET.contains(beanName)) {
                return bean;
            }
            interceptor = null;
            //check TCC proxy
            if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
                interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                    (ConfigurationChangeListener)interceptor);
            } else {
                Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

                if (!existsAnnotation(new Class[]{serviceInterface})
                    && !existsAnnotation(interfacesIfJdk)) {
                    return bean;
                }

                if (interceptor == null) {
                    if (globalTransactionalInterceptor == null) {
                        globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                        ConfigurationCache.addConfigListener(
                            ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                            (ConfigurationChangeListener)globalTransactionalInterceptor);
                    }
                    interceptor = globalTransactionalInterceptor;
                }
            }

            LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
            if (!AopUtils.isAopProxy(bean)) {
                bean = super.wrapIfNecessary(bean, beanName, cacheKey);
            } else {
                AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                for (Advisor avr : advisor) {
                    advised.addAdvisor(0, avr);
                }
            }
            PROXYED_SET.add(beanName);
            return bean;
        }
    } catch (Exception exx) {
        throw new RuntimeException(exx);
    }
}

方法中会为TwoPhaseBusinessAction注解修饰的方法生成TccActionInterceptor代理,为GlobalTransactional生成GlobalTransactionalInterceptor代理。
但在TCCBeanParserUtils#isTccAutoProxy方法中若存在TwoPhaseBusinessAction注解,会通过RmNettyRemotingClient#registerResource发送注册信息,具体方法在DefaultRemotingParser#parserRemotingServiceInfo中,个人觉得这个步骤可以去掉了有点冗余

1.2 服务端初始化

TC服务端启动类io.seata.server.Server#main,该方法中会初始化DefaultCoordinator类,这个类是所有消息的处理类,DefaultCoordinator主要属性如下

//各种定时任务,用来重试
private ScheduledThreadPoolExecutor retryRollbacking = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("RetryRollbacking", 1));
private ScheduledThreadPoolExecutor retryCommitting = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("RetryCommitting", 1));
private ScheduledThreadPoolExecutor asyncCommitting = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("AsyncCommitting", 1));
private ScheduledThreadPoolExecutor timeoutCheck = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("TxTimeoutCheck", 1));
private ScheduledThreadPoolExecutor undoLogDelete = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("UndoLogDelete", 1));
private RemotingServer remotingServer;  //消息通信服务端
private DefaultCore core;  //主要的事务处理

针对消息的处理流程,具体方法在NettyRemotingServer#registerProcessor:

private void registerProcessor() {
    // 1. registry on request message processor
    ServerOnRequestProcessor onRequestProcessor =
        new ServerOnRequestProcessor(this, getHandler());
    //处理事务提交回滚等消息    
    super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
    // 2. registry on response message processor,对分支事务提交和回滚响应结果的处理
    ServerOnResponseProcessor onResponseProcessor =
        new ServerOnResponseProcessor(getHandler(), getFutures());
    super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor);
    // 3. registry rm message processor,处理RM客户端的注册消息
    RegRmProcessor regRmProcessor = new RegRmProcessor(this);
    super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
    // 4. registry tm message processor,处理TM客户端的注册消息
    RegTmProcessor regTmProcessor = new RegTmProcessor(this);
    super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
    // 5. registry heartbeat message processor,处理客户端的心跳消息
    ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
    super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}

从上图可以看出注册消息是在RegRmProcessor和RegTmProcessor中进行处理。

1.3 TCC消息处理分析

首先还是看下一开始的示例代码:

@GlobalTransactional
public String doTransactionCommit(){
    //第一个TCC 事务参与者
    boolean result = tccActionOne.prepare(null, 1);
    if(!result){
        throw new RuntimeException("TccActionOne failed.");
    }
    List list = new ArrayList();
    list.add("c1");
    list.add("c2");
    result = tccActionTwo.prepare(null, "two", list);
    if(!result){
        throw new RuntimeException("TccActionTwo failed.");
    }
    return RootContext.getXID();
}

在调用doTransactionCommit方法时,会进入到代理类GlobalTransactionalInterceptor中,最终会执行到TransactionalTemplate#execute方法,该方法的主要逻辑如下:

beginTransaction(txInfo, tx);//开始事务
rs = business.execute(); //执行业务代码,即执行doTransactionCommit方法
commitTransaction(tx);  //提交事务  
  1. 在开始事务beginTransaction方法中,会向TC服务发送GlobalBeginRequest消息,来获取事务xid,该消息最终会在服务端DefaultCore#begin方法中得到处理:

    1. 通过雪花算法产生一个随机数作为transactionId.根据transactionId生成xid,具体规则是ipAddress + ":" + port + ,":" + transactionId,ipAddress为本机ip,port为当前服务的端口
    2. 将全局事务记录写入global_table表中,同时返回xid,表中xid为主键,transactionId为索引
  2. 执行业务代码,业务代码会调用远端服务,如tccActionOne.prepare方法,由于该方法被TwoPhaseBusinessAction修饰,会执行代理类TccActionInterceptor,在TccActionInterceptor类中的invoke方法主要逻辑如下:

String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext); //注册分支事务,同时获取分支事务id
.....
ret.put(Constants.TCC_METHOD_RESULT, targetCallback.execute()); #执行实际的分支事务的业务代码,如tccActionOne.prepare方法  

其中在doTccActionLogStore方法中客户端通过发送BranchRegisterRequest消息,其中clientId和lockKeys都是null,resourceId为TwoPhaseBusinessAction中name名称该消息最终会在服务端AbstractCore#branchRegister:
通过随机算法生成branchId
生成一个分支记录,将记录插入到branch_table表中,其中branchId为主键

  1. doTransactionCommit所有业务逻辑执行成功后,GlobalTransactionalInterceptor代理类会执行到commitTransaction方法,进行全局事务的提交,客户端会通过DefaultTransactionManager#commit方法发送GlobalCommitRequest事务提交消息,此时服务端接收到该消息后,会通过DefaultCore#doGlobalCommit方法进行全局事务的提交,该方法的主要逻辑如下:
    1. 从存储中获取全局事务xid下所有分支事务记录,为每个分支事务调用AbstractCore#branchCommitSend方法,发送BranchCommitRequest消息到对应的分支事务客户端,来进行分支事务的commit,客户端接收到消息后,会执行TwoPhaseBusinessAction注解中填写的commit方法来完成分支事务的提交
  2. 当有任何一个分支服务调用失败时,如tccActionOne.prepare调用失败,会回滚全局事务,然后TC服务端会回滚所有的分支事务

2. 写好TCC实现的注意点

写好一个完备的TCC的实现是有一定的要求,需要解决空回滚,幂等操作和悬挂问题。

  1. 空回滚
    即全局事务回滚时,有可能分支事务try接口由于网络问题并没有被触发或者还在处于try阶段,此时TC已经触发了分支事务的cancel,此时需要分支事务服务需要返回成功,不然会有重试,即分支事务要支持空回滚
  2. 幂登性
    由于网络抖动问题,分支事务中的try方法可能会被执行多次,所以要保证资源不会被重复消耗,解决办法可以通过为每一个请求维护一个唯一id,如分支事务id,来过滤重复的请求
  3. 悬挂问题
    当全局事务回滚时,由于分支事务try方法执行了较长时间,导致分支事务执行cancel方法后,try方法才执行成功,这样导致被try锁定的资源得不到释放,解决办法是将每个分支事务的请求记录下来,所以当执行try方法后,发现已经存在cancel的执行记录后,则回滚当前的try操作

3. seata中TCC模式如何做到高可用的

要做到高可用,要做到服务的无状态,为了做到这点seata做了如下工作:

  1. 存储,TC中事务数据的存储避免使用本地存储,可以使用mysql等
  2. 服务发现与注册,
    从上文实现分析中,我们可以看出TC服务会将本服务的ip注册到注册中心,如zk,etcd等,TM和RM客户端会拉取所有TC服务端的地址,同时将客户端服务的ip注册到所有TC服务中,这样保证了每个TC服务都有所有客户端的链接信息
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,186评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,858评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,620评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,888评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,009评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,149评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,204评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,956评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,385评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,698评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,863评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,544评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,185评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,899评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,141评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,684评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,750评论 2 351

推荐阅读更多精彩内容