分布式事务-Seata saga模式源码分析

1. saga配置与使用

首先看下saga模式的配置,代码地址

<bean id="dataSource" class="org.h2.jdbcx.JdbcConnectionPool" destroy-method="dispose">
        <constructor-arg>
            <bean class="org.h2.jdbcx.JdbcDataSource">
                <property name="URL" value="jdbc:h2:mem:seata_saga" />
                <property name="user" value="sa" />
                <property name="password" value="sa" />
            </bean>
        </constructor-arg>
    </bean>

    <jdbc:initialize-database data-source="dataSource">
        <jdbc:script location="classpath:sql/h2_init.sql" />
    </jdbc:initialize-database>

    <bean id="stateMachineEngine" class="io.seata.saga.engine.impl.ProcessCtrlStateMachineEngine">
        <property name="stateMachineConfig" ref="dbStateMachineConfig"></property>
    </bean>
    <bean id="dbStateMachineConfig" class="io.seata.saga.engine.config.DbStateMachineConfig">
        <property name="dataSource" ref="dataSource"></property>
        <property name="resources" value="statelang/*.json"></property>
        <property name="enableAsync" value="true"></property>
        <property name="threadPoolExecutor" ref="threadExecutor"></property>
        <property name="applicationId" value="saga_sample"></property>
        <property name="txServiceGroup" value="my_test_tx_group"></property>
    </bean>
    <bean id="threadExecutor"
          class="org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean">
        <property name="threadNamePrefix" value="SAGA_ASYNC_EXE_" />
        <property name="corePoolSize" value="1" />
        <property name="maxPoolSize" value="20" />
    </bean>
    <bean class="io.seata.saga.rm.StateMachineEngineHolder">
        <property name="stateMachineEngine" ref="stateMachineEngine"/>
    </bean>

在以上配置中的DbStateMachineConfig bean中的DefaultStateMachineConfig#afterPropertiesSet方法中会调用init方法来进行初始化状态机实例,主要逻辑如下:

  1. 初始化ExpressionFactoryManager,用于表达式解析,如spring Expression Language
  2. 初始化EvaluatorFactoryManager,用于执行表达式的验证
  3. 初始化StateMachineRepositoryImpl,用于状态机信息的维护,如状态机的初始化,存储等
    1. 初始化本地状态机数据库操作服务类,StateLangStore
    2. 加载所有状态机的配置StateMachineRepositoryImpl#registryByResources,首先从状态机组装json获取所有状态机配置,同时对比数据库中的状态机存储的配置,并更新相关数据,如更新时间等
  4. 初始化ProcessCtrlEventPublisher,用于状态机事件的同步提交,使用的是DirectEventBus来同步处理请求
  5. 初始化ProcessCtrlEventPublisher,用于状态机事件的异步提交,使用的AsyncEventBus来处理请求,AsyncEventBus主要逻辑通过线程池来异步处理

在上面流程registryByResources方法中,会从json配置中读取所有状态机流程,同时写入或更新到表seata_state_machine_def中,seata_state_machine_def中content字段表示json配置所有数据

然后我们看下事件处理流程,首先触发一次调用

//获取xml配置中添加的bean,ProcessCtrlStateMachineEngine
StateMachineEngine stateMachineEngine = (StateMachineEngine) applicationContext.getBean("stateMachineEngine");
Map<String, Object> startParams = new HashMap<>(3);
String businessKey = String.valueOf(System.currentTimeMillis());
startParams.put("businessKey", businessKey);
startParams.put("count", 10);
startParams.put("amount", new BigDecimal("100"));

//sync test
StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("reduceInventoryAndBalance", null, businessKey, startParams);

2. saga流程分析

stateMachineEngine.startWithBusinessKey方法会调用到ProcessCtrlStateMachineEngine#startInternal,方法主要有三个逻辑:

  1. 首先创建一个状态机实例StateMachineInstance,下文中会讲解状态机类StateMachine,这个大家可以理解为模板类
  2. 假如这个状态机实例需要持久化,则将状态机实例保存到state_machine_inst实例表中,假如当前状态机实例不是子状态机,还会调用beginTransaction,向TC开启全局事务
  3. 将当前请求组装到context中,推送给状态机进行处理
    StateMachineInstance instance = createMachineInstance(stateMachineName, tenantId, businessKey, startParams);
    
    if (instance.getStateMachine().isPersist() && stateMachineConfig.getStateLogStore() != null) {
        stateMachineConfig.getStateLogStore().recordStateMachineStarted(instance, processContext);
    }
    
    if (async) {
        stateMachineConfig.getAsyncProcessCtrlEventPublisher().publish(processContext);
    } else {
        stateMachineConfig.getProcessCtrlEventPublisher().publish(processContext);
    }
    
    return instance;

在createMachineInstance方法中的主要逻辑如下:

//tenantId加入为空时,会采用默认值
private StateMachineInstance createMachineInstance(String stateMachineName, String tenantId, String businessKey,
                                                       Map<String, Object> startParams) {
    //从数据库中获取对应状态机配置信息
    StateMachine stateMachine = stateMachineConfig.getStateMachineRepository().getStateMachine(stateMachineName,
    StateMachineInstanceImpl inst = new StateMachineInstanceImpl();
    ......
    inst.setStateMachine(stateMachine);
    ......
    return inst;
}

在DbAndReportTcStateLogStore#recordStateMachineStarted方法中

    public void recordStateMachineStarted(StateMachineInstance machineInstance, ProcessContext context) {

        if (machineInstance != null) {
            //if parentId is not null, machineInstance is a SubStateMachine, do not start a new global transaction,
            //use parent transaction instead.
            String parentId = machineInstance.getParentId();
            if (StringUtils.hasLength(parentId)) {
                if (StringUtils.isEmpty(machineInstance.getId())) {
                    machineInstance.setId(parentId);
                }
            } else {
                //向TC注册全局事务
                beginTransaction(machineInstance, context);
            }


            if (StringUtils.isEmpty(machineInstance.getId()) && seqGenerator != null) {
                machineInstance.setId(seqGenerator.generate(DomainConstants.SEQ_ENTITY_STATE_MACHINE_INST));
            }

            // 将当前状态机实例保存到state_machine_inst表中
            machineInstance.setSerializedStartParams(paramsSerializer.serialize(machineInstance.getStartParams()));
            executeUpdate(stateLogStoreSqls.getRecordStateMachineStartedSql(dbType),
                    STATE_MACHINE_INSTANCE_TO_STATEMENT_FOR_INSERT, machineInstance);
        }
    }

ProcessCtrlEventPublisher接收请求后,具体处理的类是DirectEventBus或AsyncEventBus,而这两个类都是通过ProcessCtrlEventConsumer来进行处理请求的,


image

从上图可以看出,请求最终会在ProcessControllerImpl类中进行处理,具体逻辑如下:

    public void process(ProcessContext context) throws FrameworkException {

        try {
            //businessProcessor为CustomizeBusinessProcessor类
            
            businessProcessor.process(context);

            businessProcessor.route(context);

        } catch (FrameworkException fex) {
            throw fex;
        } catch (Exception ex) {
            LOGGER.error("Unknown exception occurred, context = {}", context, ex);
            throw new FrameworkException(ex, "Unknown exception occurred", FrameworkErrorCode.UnknownAppError);
        }
    }

上述代码其实seata saga模式的核心方法,businessProcessor.process方法是使用状态机中的某个state来进行处理,businessProcessor.route方法用于推动状态的变更,即使用当前state的next state来继续处理请求

接下来看下state的实际处理类StateMachineProcessHandler,该类initDefaultHandlers方法会初始化stateHandlers,增加不同状态的处理器

  1. ChoiceStateHandler 分支state处理器
  2. SucceedEndStateHandler,成功结束的处理器
  3. CompensationTriggerStateHandler,具体补偿触发处理器
  4. ServiceTaskStateHandler,执行服务调用处理的处理器
  5. FailEndStateHandler,失败结束的处理器

在StateMachineProcessHandler#process处理方法中,主要有一下三个逻辑:

  1. 从入参ProcessContext中取出当前状态的类型,并获取实际的状态处理器
StateHandler stateHandler = stateHandlers.get(stateType);
  1. 假如对应的StateHandler含有拦截器,则调用对应的拦截器
for (StateHandlerInterceptor interceptor : interceptors) {
                    executedInterceptors.add(interceptor);
                    interceptor.preProcess(context);
}
  1. 调用实际的状态处理器进行处理
stateHandler.process(context);

状态处理器处理完成后,然后会执行BusinessProcessor#route方法,该方法主要是根据StateMachineProcessHandler#process结果来切换当前请求的状态,而route方法会进到DefaultRouterHandler#route中进行处理:
该方法有三个逻辑:

  1. 根绝当前状态获取路由处理器
ProcessRouter processRouter = processRouters.get(processType.getCode());
  1. 实际执行路由处理器,返回当前请求下一个要执行的state,若没有下一个返回空
 Instruction instruction = processRouter.route(context);
  1. 假如存在下一个state,则将请求再发送给状态机接收类进行处理,这样推动请求一直被处理,直到被遇到终止state
//ProcessCtrlEventPublisher
eventPublisher.publish(context);

上面是saga状态机的处理与引擎的推动,下面主要介绍下saga模式下是如何与TC通信的?
上面流程都会提到拦截器,在初始化StateHandler或 StateRouter 时会增加拦截器,重点介绍以下两个

  1. ServiceTaskHandlerInterceptor
    1. 在进行state 处理前执行DbAndReportTcStateLogStore#recordStateStarted方法,保存当前state状态到表state_inst中,同时向TC注册分支事务DbAndReportTcStateLogStore#branchRegister
    2. 在state处理完成后,会执行DbAndReportTcStateLogStore#recordStateFinished方法,更新表state_inst中state的状态,同时向TC发送report消息,具体方法在SagaTransactionalTemplate#branchReport
  2. EndStateRouterInterceptor,在执行EndStateRouter后,会执行拦截器中postRoute方法,该方法会执行EngineUtils.endStateMachine,在endStateMachine方法中会调用DbAndReportTcStateLogStore#recordStateMachineFinished来更新state_machine_inst表中状态机实例状态,同时调用SagaTransactionalTemplate#reportTransaction来通知TC该事务已完成

3. 异常处理:

  1. 当状态机模版不是一个完备的,即有可能分支路由后不能到终止state,则会抛异常来终止
  2. 在执行过程中遇到不能处理的异常,则会抛异常来终止

4.TC消息处理:

  1. beginTransaction,发起全局事务
  2. branchRegister,进行分支注册
  3. BranchReportRequest消息,主要执行coordinator.AbstractCore#branchReport方法,修改表branch_table中对应分支事务的状态
  4. SagaCore#doGlobalReport,在全局事务report时,如果是状态是终止状态后,则会更新全局事务的状态,如果不是终止状态,则会更改事务状态,来进行重试
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,186评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,858评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,620评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,888评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,009评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,149评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,204评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,956评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,385评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,698评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,863评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,544评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,185评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,899评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,141评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,684评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,750评论 2 351

推荐阅读更多精彩内容