1. saga配置与使用
首先看下saga模式的配置,代码地址:
<bean id="dataSource" class="org.h2.jdbcx.JdbcConnectionPool" destroy-method="dispose">
<constructor-arg>
<bean class="org.h2.jdbcx.JdbcDataSource">
<property name="URL" value="jdbc:h2:mem:seata_saga" />
<property name="user" value="sa" />
<property name="password" value="sa" />
</bean>
</constructor-arg>
</bean>
<jdbc:initialize-database data-source="dataSource">
<jdbc:script location="classpath:sql/h2_init.sql" />
</jdbc:initialize-database>
<bean id="stateMachineEngine" class="io.seata.saga.engine.impl.ProcessCtrlStateMachineEngine">
<property name="stateMachineConfig" ref="dbStateMachineConfig"></property>
</bean>
<bean id="dbStateMachineConfig" class="io.seata.saga.engine.config.DbStateMachineConfig">
<property name="dataSource" ref="dataSource"></property>
<property name="resources" value="statelang/*.json"></property>
<property name="enableAsync" value="true"></property>
<property name="threadPoolExecutor" ref="threadExecutor"></property>
<property name="applicationId" value="saga_sample"></property>
<property name="txServiceGroup" value="my_test_tx_group"></property>
</bean>
<bean id="threadExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean">
<property name="threadNamePrefix" value="SAGA_ASYNC_EXE_" />
<property name="corePoolSize" value="1" />
<property name="maxPoolSize" value="20" />
</bean>
<bean class="io.seata.saga.rm.StateMachineEngineHolder">
<property name="stateMachineEngine" ref="stateMachineEngine"/>
</bean>
在以上配置中的DbStateMachineConfig bean中的DefaultStateMachineConfig#afterPropertiesSet方法中会调用init方法来进行初始化状态机实例,主要逻辑如下:
- 初始化ExpressionFactoryManager,用于表达式解析,如spring Expression Language
- 初始化EvaluatorFactoryManager,用于执行表达式的验证
- 初始化StateMachineRepositoryImpl,用于状态机信息的维护,如状态机的初始化,存储等
- 初始化本地状态机数据库操作服务类,StateLangStore
- 加载所有状态机的配置StateMachineRepositoryImpl#registryByResources,首先从状态机组装json获取所有状态机配置,同时对比数据库中的状态机存储的配置,并更新相关数据,如更新时间等
- 初始化ProcessCtrlEventPublisher,用于状态机事件的同步提交,使用的是DirectEventBus来同步处理请求
- 初始化ProcessCtrlEventPublisher,用于状态机事件的异步提交,使用的AsyncEventBus来处理请求,AsyncEventBus主要逻辑通过线程池来异步处理
在上面流程registryByResources方法中,会从json配置中读取所有状态机流程,同时写入或更新到表seata_state_machine_def中,seata_state_machine_def中content字段表示json配置所有数据
然后我们看下事件处理流程,首先触发一次调用
//获取xml配置中添加的bean,ProcessCtrlStateMachineEngine
StateMachineEngine stateMachineEngine = (StateMachineEngine) applicationContext.getBean("stateMachineEngine");
Map<String, Object> startParams = new HashMap<>(3);
String businessKey = String.valueOf(System.currentTimeMillis());
startParams.put("businessKey", businessKey);
startParams.put("count", 10);
startParams.put("amount", new BigDecimal("100"));
//sync test
StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("reduceInventoryAndBalance", null, businessKey, startParams);
2. saga流程分析
stateMachineEngine.startWithBusinessKey方法会调用到ProcessCtrlStateMachineEngine#startInternal,方法主要有三个逻辑:
- 首先创建一个状态机实例StateMachineInstance,下文中会讲解状态机类StateMachine,这个大家可以理解为模板类
- 假如这个状态机实例需要持久化,则将状态机实例保存到state_machine_inst实例表中,假如当前状态机实例不是子状态机,还会调用beginTransaction,向TC开启全局事务
- 将当前请求组装到context中,推送给状态机进行处理
StateMachineInstance instance = createMachineInstance(stateMachineName, tenantId, businessKey, startParams);
if (instance.getStateMachine().isPersist() && stateMachineConfig.getStateLogStore() != null) {
stateMachineConfig.getStateLogStore().recordStateMachineStarted(instance, processContext);
}
if (async) {
stateMachineConfig.getAsyncProcessCtrlEventPublisher().publish(processContext);
} else {
stateMachineConfig.getProcessCtrlEventPublisher().publish(processContext);
}
return instance;
在createMachineInstance方法中的主要逻辑如下:
//tenantId加入为空时,会采用默认值
private StateMachineInstance createMachineInstance(String stateMachineName, String tenantId, String businessKey,
Map<String, Object> startParams) {
//从数据库中获取对应状态机配置信息
StateMachine stateMachine = stateMachineConfig.getStateMachineRepository().getStateMachine(stateMachineName,
StateMachineInstanceImpl inst = new StateMachineInstanceImpl();
......
inst.setStateMachine(stateMachine);
......
return inst;
}
在DbAndReportTcStateLogStore#recordStateMachineStarted方法中
public void recordStateMachineStarted(StateMachineInstance machineInstance, ProcessContext context) {
if (machineInstance != null) {
//if parentId is not null, machineInstance is a SubStateMachine, do not start a new global transaction,
//use parent transaction instead.
String parentId = machineInstance.getParentId();
if (StringUtils.hasLength(parentId)) {
if (StringUtils.isEmpty(machineInstance.getId())) {
machineInstance.setId(parentId);
}
} else {
//向TC注册全局事务
beginTransaction(machineInstance, context);
}
if (StringUtils.isEmpty(machineInstance.getId()) && seqGenerator != null) {
machineInstance.setId(seqGenerator.generate(DomainConstants.SEQ_ENTITY_STATE_MACHINE_INST));
}
// 将当前状态机实例保存到state_machine_inst表中
machineInstance.setSerializedStartParams(paramsSerializer.serialize(machineInstance.getStartParams()));
executeUpdate(stateLogStoreSqls.getRecordStateMachineStartedSql(dbType),
STATE_MACHINE_INSTANCE_TO_STATEMENT_FOR_INSERT, machineInstance);
}
}
ProcessCtrlEventPublisher接收请求后,具体处理的类是DirectEventBus或AsyncEventBus,而这两个类都是通过ProcessCtrlEventConsumer来进行处理请求的,
从上图可以看出,请求最终会在ProcessControllerImpl类中进行处理,具体逻辑如下:
public void process(ProcessContext context) throws FrameworkException {
try {
//businessProcessor为CustomizeBusinessProcessor类
businessProcessor.process(context);
businessProcessor.route(context);
} catch (FrameworkException fex) {
throw fex;
} catch (Exception ex) {
LOGGER.error("Unknown exception occurred, context = {}", context, ex);
throw new FrameworkException(ex, "Unknown exception occurred", FrameworkErrorCode.UnknownAppError);
}
}
上述代码其实seata saga模式的核心方法,businessProcessor.process方法是使用状态机中的某个state来进行处理,businessProcessor.route方法用于推动状态的变更,即使用当前state的next state来继续处理请求
接下来看下state的实际处理类StateMachineProcessHandler,该类initDefaultHandlers方法会初始化stateHandlers,增加不同状态的处理器
- ChoiceStateHandler 分支state处理器
- SucceedEndStateHandler,成功结束的处理器
- CompensationTriggerStateHandler,具体补偿触发处理器
- ServiceTaskStateHandler,执行服务调用处理的处理器
- FailEndStateHandler,失败结束的处理器
在StateMachineProcessHandler#process处理方法中,主要有一下三个逻辑:
- 从入参ProcessContext中取出当前状态的类型,并获取实际的状态处理器
StateHandler stateHandler = stateHandlers.get(stateType);
- 假如对应的StateHandler含有拦截器,则调用对应的拦截器
for (StateHandlerInterceptor interceptor : interceptors) {
executedInterceptors.add(interceptor);
interceptor.preProcess(context);
}
- 调用实际的状态处理器进行处理
stateHandler.process(context);
状态处理器处理完成后,然后会执行BusinessProcessor#route方法,该方法主要是根据StateMachineProcessHandler#process结果来切换当前请求的状态,而route方法会进到DefaultRouterHandler#route中进行处理:
该方法有三个逻辑:
- 根绝当前状态获取路由处理器
ProcessRouter processRouter = processRouters.get(processType.getCode());
- 实际执行路由处理器,返回当前请求下一个要执行的state,若没有下一个返回空
Instruction instruction = processRouter.route(context);
- 假如存在下一个state,则将请求再发送给状态机接收类进行处理,这样推动请求一直被处理,直到被遇到终止state
//ProcessCtrlEventPublisher
eventPublisher.publish(context);
上面是saga状态机的处理与引擎的推动,下面主要介绍下saga模式下是如何与TC通信的?
上面流程都会提到拦截器,在初始化StateHandler或 StateRouter 时会增加拦截器,重点介绍以下两个
- ServiceTaskHandlerInterceptor
- 在进行state 处理前执行DbAndReportTcStateLogStore#recordStateStarted方法,保存当前state状态到表state_inst中,同时向TC注册分支事务DbAndReportTcStateLogStore#branchRegister
- 在state处理完成后,会执行DbAndReportTcStateLogStore#recordStateFinished方法,更新表state_inst中state的状态,同时向TC发送report消息,具体方法在SagaTransactionalTemplate#branchReport
- EndStateRouterInterceptor,在执行EndStateRouter后,会执行拦截器中postRoute方法,该方法会执行EngineUtils.endStateMachine,在endStateMachine方法中会调用DbAndReportTcStateLogStore#recordStateMachineFinished来更新state_machine_inst表中状态机实例状态,同时调用SagaTransactionalTemplate#reportTransaction来通知TC该事务已完成
3. 异常处理:
- 当状态机模版不是一个完备的,即有可能分支路由后不能到终止state,则会抛异常来终止
- 在执行过程中遇到不能处理的异常,则会抛异常来终止
4.TC消息处理:
- beginTransaction,发起全局事务
- branchRegister,进行分支注册
- BranchReportRequest消息,主要执行coordinator.AbstractCore#branchReport方法,修改表branch_table中对应分支事务的状态
- SagaCore#doGlobalReport,在全局事务report时,如果是状态是终止状态后,则会更新全局事务的状态,如果不是终止状态,则会更改事务状态,来进行重试