目录
- 前置阅读
- 整体架构
- 使用场景
- XA
- AT
- 运行模式
- 写隔离
- 读隔离
- 优缺点
- TCC
- 适用场景
- 优缺点
- SAGA
- 基于状态机引擎的 Saga 实现
- 优缺点
- 实践
- 服务端
- 客户端
- 原理解析
- 服务端
- 初始化
- 执行
- 客户端
- 执行流程
- SAGA状态机实现
- SAGA状态机引擎设计
- 服务端
- 高级特性
- 幂等
- Seata设计
- 防悬挂
- Seata设计
- 空回滚
- Seata设计
- 事务分组
- 高可用体现
- 幂等
- Seata VS ServiceComb
- Saga实现区别
前置阅读
- 无分布式事务基础可以简单先看分布式事务之ServiceComb,有基础可直接跳过
整体架构
- Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案
-
整体
- Seata整体分为Seata客户端对应需要分布式事务的业务服务,Seata服务端,协调分布式事务提交,回滚,数据记录
-
交互,从SEATA官方文档截下来的
- 这里面Business业务,Account账号,Order订单,Stock库存微服务,Business是服务发起方,会调用扣减库存Stock微服务,下订单Order微服务,下订单Order会调用Account微服务。Business作为发起方定义为Transaction Manager (TM): 控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议。其他几个Account账号,Order订单,Stock库存微服务是属于Resource Manager (RM): 控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。服务端Transaction Coordinator (TC): 事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚
- 各个微服务通过XID交互,类似Skywalking的TraceId全局链路id。套路也是一样,如果是Http请求则通过拦截Http请求头获取全局XID,Dubbo请求则拦截Dubbo请求头
使用场景
XA
- 前提: 支持XA 事务的数据库,Java 应用,通过 JDBC 访问数据库
- 利用事务资源数据源对 XA 协议的支持,以 XA 协议的机制来管理分支事务的一种 事务模式
- MySQL 支持的分布式事务 XA中描述了Mysql对XA的支持, 对应的Mysql的语法为,需要mysql 5.7以上并且是Innodb
# 查询
SELECT * FROM test111;
commit;
# 开始XA
XA START 'test12138_4';
# 随便搞张表测试
insert into `test111` values(9, 45, 556);
# 结束XA
XA END 'test12138_4';
# 一阶段提交
XA PREPARE 'test12138_4';
# 对应二阶段提交或者回滚
XA COMMIT 'test12138_4';
-
Mysql对XA支持图中的"事务id存储"服务是应用服务自身进行的逻辑,与 XA 协议无关
- Seata对应XA使用的例子在Seata-XA, 基本上只需要增加, 相当于对数据源做代理
@Bean("dataSourceProxy")
public DataSource dataSource(DruidDataSource druidDataSource) {
// DataSourceProxy for AT mode
// return new DataSourceProxy(druidDataSource);
// DataSourceProxyXA for XA mode
return new DataSourceProxyXA(druidDataSource);
}
-
Seata-XA官网流程,相当于代理了Mysql等XA的使用
- 优点:
- 业务无侵入
- 数据库支持广泛,多语言支持
- 缺点
- 性能差
- 数据锁定:数据在整个事务处理过程结束前,都被锁定,读写都按隔离级别的定义约束起来
- 协议阻塞:XA prepare 后,分支事务进入阻塞阶段,收到 XA commit 或 XA rollback 前必须阻塞等待
AT
- 官方提供的例子Seata-AT例子
- AT模式前提: 基于支持本地 ACID 事务的关系型数据库,Java 应用,通过 JDBC 访问数据库
- 整体机制: 两阶段提交协议的演变:
- 一阶段: 业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源
- 二阶段: 提交异步化,非常快速地完成。回滚通过一阶段的回滚日志进行反向补偿
运行模式
- AT分支事务, SQL语句, product表有id, name, since三个字段
update product set name = 'GTS' where name = 'TXC';
- 一阶段解析 SQL
- 得到 SQL 的类型UPDATE,表product,条件where name = 'TXC'等相关的信息
- 查询前镜像:根据解析得到的条件信息,生成查询语句,定位数据,得到数据(1, TXC, 2014)
select id, name, since from product where name = 'TXC';
- 执行业务 SQL: 更新这条记录的 name 为 'GTS'
- 插入回滚日志: 把前后镜像数据以及业务 SQL 相关的信息组成一条回滚日志记录,插入到 UNDO_LOG(seata自定义的表,非mysql中的undo log)
- 提交前,向 TC 注册分支: 申请 product 表中,主键值等于 1 的记录的全局锁
, 全局锁做到了写隔离 - 本地事务提交: 业务数据的更新和前面步骤中生成的 UNDO LOG 一并提交
- 将本地事务提交的结果上报给 TC
- 二阶段-提交
- 收到 TC 的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的结果给 TC
- 异步任务阶段的分支提交请求将异步和批量地删除相应 UNDO LOG 记录
- 二阶段-回滚
- 收到 TC 的分支回滚请求,开启一个本地事务
- 通过 XID 和 Branch ID 查找到相应的 UNDO LOG 记录
- 数据校验:拿 UNDO LOG 中的后镜与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改。这种情况,需要根据配置策略来做处理,写隔离的方式可以避免
- 根据 UNDO LOG 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句
update product set name = 'TXC' where id = 1;
- 提交本地事务。并把本地事务的执行结果-----分支事务回滚的结果, 上报给 TC
写隔离
-
原则: 一阶段本地事务提交前,需要确保先拿到全局锁 。拿不到全局锁,不能提交本地事务。拿全局锁的尝试被限制在一定范围内,超出范围将放弃,并回滚本地事务,释放本地锁
- 两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的全局锁 ,本地提交释放本地锁。 tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待全局锁
-
tx1 二阶段全局提交,释放全局锁 。tx2 拿到全局锁提交本地事务
- 如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚,此时,如果 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。因为整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 脏写 的问题
- 使用方式
- 可以在tx2也加上@GlobalTransactional,就会处理全局锁,有全局锁时直接抛异常
-
可以在tx2上加上@GlobalLock然后查询语句加上select for update,加上select for update后锁冲突更温和些。如果只有@GlobalLock,检查到全局锁,则立刻抛出异常,也许再“坚持”那么一下,全局锁就释放了,抛出异常岂不可惜了。
在updateA()中可以通过select for update获得最新的A,接着再做更新
读隔离
- Seata 的方式是通过 SELECT FOR UPDATE 语句的代理,SELECT FOR UPDATE 语句的执行会申请 全局锁 ,如果 全局锁 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回
优缺点
- 优点
- 实现简单,基本0代码耦合,只需要在TM的地方加一个全局事务注解即可
- 性能优秀,第一阶段则释放本地锁
- 使用简单,学习成本低,微服务可直接集成进来
- 缺点
- 只支持数据库的回滚,涉及中间件,第三方等无法支持
TCC
- 官方提供的例子Seata-TCC例子
-
对应代码展示(非官网提供的例子), seata-tcc是业务发起方即TM,seata-tcc-order是seata-tcc的一个业务执行分支即RM
- seata-tcc发起三个RM,tccActionOne.prepare是RM角色;tccActionTwo.prepare也是RM角色代码与tccActionOne类似,注意看类和方法上的注解;orderRemoteService.createOrder是远程调用,远程调用seata-tcc-order,测试用的httpClient并将XID通过请求头形成传递
@GlobalTransactional
public String doTransactionCommit() {
// 第一个TCC 事务参与者, 事务参与者, 模拟本地事务
boolean result = tccActionOne.prepare(null, 1);
if (!result) {
throw new RuntimeException("TccActionOne failed.");
}
List list = new ArrayList();
list.add("c1");
list.add("c2");
// 第二个TCC 模拟redis
result = tccActionTwo.prepare(null, "two", list);
if (!result) {
throw new RuntimeException("TccActionTwo failed.");
}
// 第三个TCC,远程调用,order服务注册RM, header透传xid非常重要
orderRemoteService.createOrder();
return RootContext.getXID();
}
@LocalTCC
public interface TccActionOne {
@TwoPhaseBusinessAction(name = "TccActionOne", commitMethod = "commit", rollbackMethod = "rollback")
public boolean prepare(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "a") int a);
public boolean commit(BusinessActionContext actionContext);
public boolean rollback(BusinessActionContext actionContext);
}
# 远程调用seata-tcc-order服务对应的实现
@LocalTCC
public interface OrderService {
@TwoPhaseBusinessAction(name = "OrderService", commitMethod = "commit", rollbackMethod = "rollback")
public boolean prepare(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "a") int a);
public boolean commit(BusinessActionContext actionContext);
public boolean rollback(BusinessActionContext actionContext);
}
-
官网图示
- 流程分析:
- seata-tcc中业务流程@GlobalTransactional标识seata-tcc是TM,seata-tcc-,seata-tcc-order中@LocalTCC标识是RM,所以代码里面有三个RM,一个TM
- 代码执行到 doTransactionCommit时,@GlobalTransactional注解对应拦截器会TM会将开启全局事务请求发送到服务端TC
- 执行到tccActionOne.prepare时对应注解会注册RM分支事务到TC服务
- 执行到tccActionTwo.prepare时对应注解会注册RM分支事务到TC服务
- 执行到orderRemoteService.createOrder时,由于seata-tcc透传了XID,所以seata-tcc-order接收http请求时从请求头拦截到XID,在执行到orderService.prepare时对应注解会注册RM分支事务到TC服务
- 执行完后doTransactionCommit向TC发送提交coomit或者回滚rollback请求
- TC向各个RM发送执行提交或者回滚对应方法
适用场景
- 异步确保型 TCC 解决方案, 比如支付宝的担保交易,会员注册发邮件
- 由于从业务服务消费消息是一个异步的过程,执行时间不确定,可能会导致不一致时间窗口增加。因此,异步确保性 TCC 分布式事务解决方案只适用于对最终一致性时间敏感度较低的一些被动型业务
- 例子: 在担保交易场景中,七天以后才需要将资金从中间账户划拨给商户,中间账户并不需要对外展示,在执行完支付服务的第一阶段Prepare后,就可以认为本次交易的支付环节已经完成,并向用户和商户返回支付成功的结果,并不需要马上执行支付服务二阶段的 Commit 接口,等到低锋期时,再慢慢消化,异步地执行。Try 接口扣除用户可用资金,转移到预冻结资金,Commit 接口扣除预冻结资金
- 通用型 TCC 解决方案,适用于执行时间确定且较短的业务,比如互联网金融企业最核心的三个服务:交易、支付、账务
-
最典型的 TCC 分布式事务模型实现,所有从业务服务都需要参与到主业务服务的决策当中,从业务服务是同步调用,还有种场景就是转账A给B转账,A和B账号加入分库了,那也需要分布式事务,A的prepare预扣除,B的prepare不需要执行业务逻辑,当然需要处理一些比如空回滚,防悬挂,幂等等非业务性逻辑
- 补偿型 TCC 解决方案
- 由于存在回滚补偿失败的情况,补偿型 TCC 分布式事务解决方案只适用于一些并发冲突较少或者需要与外部交互的业务,这些外部业务不属于被动型业务,其执行结果会影响主业务服务的决策,比如机票代理商的机票预订服务
-
定需要中转的机票肯定希望要么都成功或者都失败
优缺点
- 不与具体的服务框架耦合
- 与底层 RPC 协议无关
- 与底层存储介质无关
- 可以灵活选择业务资源的锁定粒度,减少资源锁持有时间
- 可扩展性好
SAGA
- 官方提供的例子Seata-SAGA例子
- Saga模式是SEATA提供的长事务解决方案,一般不适用与类似TCC金融核心业务, 更适用于非核心业务,比如聚合业务, 渠道层、产品层、集成层的系统,参与者包含其它公司或遗留系统服务,无法提供 TCC 模式要求的三个接口
基于状态机引擎的 Saga 实现
- 官网的例子可以感受下
- 通过状态图来定义服务调用的流程并生成 json 状态语言定义文件
- 状态图中一个节点可以是调用一个服务,节点可以配置它的补偿节点
- 状态图 json 由状态机引擎驱动执行,当出现异常时状态引擎反向执行已成功节点对应的补偿节点将事务回滚
-
可以实现服务编排需求,支持单项选择、并发、子流程、参数转换、参数映射、服务执行状态判断、异常捕获等功能
优缺点
- 一阶段提交本地事务,无锁,高性能
- 事件驱动架构,参与者可异步执行,高吞吐
- 补偿服务易于实现
- 缺点: 不保证隔离性, 由于 Saga 事务不保证隔离性, 在极端情况下可能由于脏写无法完成回滚操作, 比如举一个极端的例子, 分布式事务内先给用户A充值, 然后给用户B扣减余额, 如果在给A用户充值成功, 在事务提交以前, A用户把余额消费掉了, 如果事务发生回滚, 这时则没有办法进行补偿了(A, B用户是分库的Key, 这时候在不同的库所以需要分布式事务)。这就是缺乏隔离性造成的典型的问题, 实践中一般的应对方法是:业务流程设计时遵循“宁可长款, 不可短款”的原则, 长款意思是客户少了钱机构多了钱, 以机构信誉可以给客户退款, 反之则是短款, 少的钱可能追不回来了。所以在业务流程设计上一定是先扣款
实践
- 以springboot视角实践, 官网的例子是默认用file形式,我们改用springboot加上Nacos注册中心配置中心的形式,服务端也用Nacos注册中心配置中心
服务端
- seata-github源码下载下来
- 本文注册中心,配置中心使用的nacos,所以nacos需要先运行起来
-
在script包底下: nacos-config.py和nacos-config.sh把username和password设置成nacos对应的账号密码,然后执行脚本,把配置弄上去
- script包底下: server/db/mysql.sql脚本执行
- 在server包底下:
pom文件: mysql-connector-java我这边直接增加版本
<version>8.0.18</version>
- 在server包下改配置,改成配置中心nacos和配置中心nacos,注意数据库密码改成你自己的
server:
port: 7091
spring:
application:
name: seata-server
logging:
config: classpath:logback-spring.xml
file:
path: ${user.home}/logs/seata
extend:
logstash-appender:
destination: 127.0.0.1:4560
kafka-appender:
bootstrap-servers: 127.0.0.1:9092
topic: logback_to_logstash
seata:
security:
secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
tokenValidityInMilliseconds: 1800000
ignore:
urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.ico,/console-fe/public/**,/api/v1/auth/login
config:
# support: nacos 、 consul 、 apollo 、 zk 、 etcd3
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace:
group: SEATA_GROUP
username: nacos
password: nacos
cluster: default
##if use MSE Nacos with auth, mutex with username/password attribute
#access-key: ""
#secret-key: ""
data-id: seataServer.properties
consul:
server-addr: 127.0.0.1:8500
acl-token:
key: seata.properties
apollo:
appId: seata-server
apollo-meta: http://192.168.1.204:8801
apollo-config-service: http://192.168.1.204:8080
namespace: application
apollo-access-key-secret:
cluster: seata
zk:
server-addr: 127.0.0.1:2181
session-timeout: 6000
connect-timeout: 2000
username:
password:
node-path: /seata/seata.properties
etcd3:
server-addr: http://localhost:2379
key: seata.properties
registry:
# support: nacos 、 eureka 、 redis 、 zk 、 consul 、 etcd3 、 sofa
type: nacos
preferred-networks: 30.240.*
nacos:
cluster: default
server-addr: 127.0.0.1:8848
group: SEATA_GROUP
username: nacos
password: nacos
application: seata-server
eureka:
service-url: http://localhost:8761/eureka
application: default
weight: 1
redis:
server-addr: localhost:6379
db: 0
password:
cluster: default
timeout: 0
zk:
cluster: default
server-addr: 127.0.0.1:2181
session-timeout: 6000
connect-timeout: 2000
username: ""
password: ""
consul:
cluster: default
server-addr: 127.0.0.1:8500
acl-token:
etcd3:
cluster: default
server-addr: http://localhost:2379
sofa:
server-addr: 127.0.0.1:9603
application: default
region: DEFAULT_ZONE
datacenter: DefaultDataCenter
cluster: default
group: SEATA_GROUP
address-wait-time: 3000
server:
service-port: 8091 #If not configured, the default is '${server.port} + 1000'
max-commit-retry-timeout: -1
max-rollback-retry-timeout: -1
rollback-retry-timeout-unlock-enable: false
enable-check-auth: true
enable-parallel-request-handle: true
retry-dead-threshold: 130000
xaer-nota-retry-timeout: 60000
recovery:
handle-all-session-period: 1000
undo:
log-save-days: 7
log-delete-period: 86400000
session:
branch-async-queue-size: 5000 #branch async remove queue size
enable-branch-async-remove: false #enable to asynchronous remove branchSession
store:
# support: file 、 db 、 redis
mode: db
session:
mode: db
lock:
mode: db
file:
dir: sessionStore
max-branch-session-size: 16384
max-global-session-size: 512
file-write-buffer-cache-size: 16384
session-reload-read-size: 100
flush-disk-mode: async
db:
datasource: druid
db-type: mysql
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true
user: root
password: ****你的数据库密码
min-conn: 5
max-conn: 100
global-table: global_table
branch-table: branch_table
lock-table: lock_table
distributed-lock-table: distributed_lock
query-limit: 100
max-wait: 5000
redis:
mode: single
database: 0
min-conn: 1
max-conn: 10
password:
max-total: 100
query-limit: 100
single:
host: 127.0.0.1
port: 6379
sentinel:
master-name:
sentinel-hosts:
metrics:
enabled: false
registry-type: compact
exporter-list: prometheus
exporter-prometheus-port: 9898
transport:
rpc-tc-request-timeout: 30000
enable-tc-server-batch-send-response: false
shutdown:
wait: 3
thread-factory:
boss-thread-prefix: NettyBoss
worker-thread-prefix: NettyServerNIOWorker
boss-thread-size: 1
console:
user:
username: seata
password: seata
客户端
-
结构长这样, 顺序是seata-tcc调用seata-tcc-order
- 父pom文件
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.xxxxx</groupId>
<artifactId>seata</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>pom</packaging>
<name>seata</name>
<properties>
<java.version>1.8</java.version>
<seata-spring-boot.version>1.4.0</seata-spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>${seata-spring-boot.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>2.0.1</version>
</dependency>
<!--springboot相关-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
- seata-tcc pom文件
<parent>
<groupId>com.seeger</groupId>
<artifactId>seata</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>seata-tcc</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>seata-tcc</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
</dependencies>
- seata-tcc-order pom文件
<parent>
<groupId>com.seeger</groupId>
<artifactId>seata</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>seata-tcc-order</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>seata-tcc-order</name>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies/>
- 两个客户端的application
spring:
application:
name: springboot-tcc-order-sample
server:
port: 8081
seata:
enabled: true
application-id: ${spring.application.name}
tx-service-group: seata-demo
registry:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
group: SEATA_GROUP
username: nacos
password: nacos
application: seata-server
service:
vgroup-mapping:
seata-demo: default
disable-global-transaction: false
- 然后就可以根据官网提供的tcc例子进行改造,区别在于要在例子TccActionOne上面加上@LocalTCC,这里是使用seata-tcc手动httpclient调用seata-tcc-order, 请求头需要加上 header.put("TX_XID", RootContext.getXID()),以支持分布式事务的跨服务实例传播
原理解析
服务端
- 启动源码在Server底下,springboot启动,然后ServerRunner -> Server, ServerRunner实现了CommandLineRunner, 版本1.6.0快照
初始化
- Server类中
public static void start(String[] args) {
// 1. 第一步
ParameterParser parameterParser = new ParameterParser(args);
// 2. 第二步
MetricsManager.get().init();
// 3. 第三步
System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());
// 4. 第四步
ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
// 5. 第五步
UUIDGenerator.init(parameterParser.getServerNode());
// 6. 第六步
SessionHolder.init(parameterParser.getSessionStoreMode());
// 7. 第七步
LockerManagerFactory.init(parameterParser.getLockStoreMode());
// 8. 第八步
DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);
coordinator.init();
nettyRemotingServer.setHandler(coordinator);
// 9. 第九步
ServerRunner.addDisposable(coordinator);
// 10. 第十步
if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
XID.setIpAddress(parameterParser.getHost());
} else {
String preferredNetworks = ConfigurationFactory.getInstance().getConfig(REGISTRY_PREFERED_NETWORKS);
if (StringUtils.isNotBlank(preferredNetworks)) {
XID.setIpAddress(NetUtil.getLocalIp(preferredNetworks.split(REGEX_SPLIT_CHAR)));
} else {
XID.setIpAddress(NetUtil.getLocalIp());
}
}
// 11. 第十一步
nettyRemotingServer.init();
}
- 第一步: 这一步是解析启动以及配置文件的各种配置参数, 其中如果你配置了从配置中心读取,也可以读取到,主要是利用了SPI机制,决定是从哪里读取, 源码config底下有各种配置中心读取的SPI实现类
- 第二步: metrics相关,这里是使用SPI机制获取Registry实例对象
- 第三步: 把从配置文件中读取到的storeMode写入SystemProperty中,方便其他类使用
-
第四步: 初始化一个线程池,创建NettyRemotingServer实例,NettyRemotingServer是一个基于Netty实现的RPC框架,此时并没有初始化,NettyRemotingServer负责与客户端SDK中的TM、RM进行网络通信,AbstractNettyRemotingClient继承自AbstractNettyRemoting,抽象的很好
- 第五步:UUIDGenerator初始化,UUIDGenerator基于雪花算法实现,
用于生成全局事务、分支事务的id。多个Server实例配置不同的ServerNode,保证id的唯一性,seata解决时间回拨的方式是采用了报错上次时间戳,如果相同则,线程阻塞等待5ms - 第六步:SessionHodler负责事务日志状态的持久化存储,当前支持file、db、redis三种存储模式,集群部署模式要使用db或redis模式
- 第七步:初始化锁的模式
- 第八步:创建初始化DefaultCoordinator实例,DefaultCoordinator是TC的核心事务逻辑处理类,底层包含了AT、TCC、SAGA等不同事务类型的逻辑处理,提交,回滚,超时等核心,初始化时, 开启了很多定时任务
public void init() {
retryRollbacking.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(RETRY_ROLLBACKING, this::handleRetryRollbacking), 0,
ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
retryCommitting.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(RETRY_COMMITTING, this::handleRetryCommitting), 0,
COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
asyncCommitting.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,
ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
timeoutCheck.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(TX_TIMEOUT_CHECK, this::timeoutCheck), 0,
TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);
undoLogDelete.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete),
UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
}
- 第九步:ShutdownHook
- 第十步:XID处理,XID是根据UUIDGenerator.generateUUID()链接而来
- 第十一步: 初始化Netty,开始监听端口并阻塞, 不同请求有不同类型,不同类型有不同的process,属于同一个handle(ServerHandler), 具体处理逻辑抽象都AbstractNettyRemoting.processMessage
执行
- 客户端和服务端网络通信: 服务端接收数据是通过Netty,AbstractNettyRemoting.processMessage会将处理数据分发到对应Process, 比如注册请求就是RegRmProcessor.process处理,服务端发送响应是通过channel.writeAndFlush
- 执行回滚或者提交,或者超时处理,DefaultCoordinator会定时轮训看下是否有需要提交或者回滚的,或者超时回滚的,有需要的则调用对应的客户端,进行回滚提交,超时回滚等
客户端
初始化
- seata-spring-boot-starter,启动Config类SeataAutoConfiguration,注入了GlobalTransactionScanner和seataAutoDataSourceProxyCreator。GlobalTransactionScanner继承于AutoProxyCreator是AOP实现,这样就通过注解实现全局事务拦截。TccActionInterceptor和GlobalTransactionalInterceptor(AT模式)。GloabalTransactionScanner还负责TM、RM的初始化工作,是在initClient方法中实现的,这块主要是用Netty初始化客户端与TC服务端连接并注册响应服务端的process
执行流程
- GlobalTransactionalInterceptor会拦截GlobalTransactional,发给服务端开启全局事务
- TCC模式的话,TccActionInterceptor会拦截注册分支事务
- TransactionPropagationIntercepter会从上下文请求头获取TX_XID,保证RM分支事务是加入全局TM
- 判断是否TCC需要有@LocalTcc注解,TCCBeanParserUtils判断
SAGA状态机实现
- 状态图是先执行 stateA, 再执行 stataB,然后执行 stateC
- 状态的执行是基于事件驱动的模型,stataA 执行完成后,会产生路由消息放入 EventQueue,事件消费端从 EventQueue 取出消息,执行 stateB
- 在整个状态机启动时会调用 Seata Server 开启分布式事务,并生产 xid, 然后记录状态机实例启动事件到本地数据库(seata三张表)
- 当执行到一个状态时会调用 Seata Server 注册分支事务,并生产 branchId, 然后记录状态实例开始执行事件到本地数据库
- 当一个状态执行完成后会记录状态实例执行结束事件到本地数据库, 然后调用 Seata Server 上报分支事务的状态
- 当整个状态机执行完成,会记录状态机实例执行完成事件到本地数据库, 然后调用 Seata Server 提交或回滚分布式事务
SAGA状态机引擎设计
- 状态机引擎的设计主要分成三层, 上层依赖下层,从下往上分别是:
- Eventing 层:实现事件驱动架构, 可以压入事件, 并由消费端消费事件, 本层不关心事件是什么消费端执行什么,由上层实现
- ProcessController 层:由于上层的 Eventing 驱动一个“空”流程执行的执行,"state"的行为和路由都未实现,由上层实现;
PS:基于以上两层理论上可以自定义扩展任何"流程"引擎 - StateMachineEngine 层:实现状态机引擎每种 state 的行为和路由逻辑;
提供 API、状态机语言仓库
高级特性
幂等
- 原服务与补偿服务都需要保证幂等性, 由于网络可能超时, 可以设置重试策略,重试发生时要通过幂等控制避免业务数据重复更新, 可以通过设置唯一健(XID + BrancId),请求过来时判断这个唯一健(XID + BrancId)是否执行过
Seata处理方式
- TCCResourceManager类中
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
//省略判断
Object targetTCCBean = tccResource.getTargetBean();
Method commitMethod = tccResource.getCommitMethod();
//省略判断
try {
//BusinessActionContext
BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
applicationData);
Object[] args = this.getTwoPhaseCommitArgs(tccResource, businessActionContext);
Object ret;
boolean result;
//注解 useTCCFence 属性是否设置为 true
if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {
try {
result = TCCFenceHandler.commitFence(commitMethod, targetTCCBean, xid, branchId, args);
} catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
throw e.getCause();
}
} else {
//省略逻辑
}
LOGGER.info("TCC resource commit result : {}, xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId);
return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
} catch (Throwable t) {
//省略
return BranchStatus.PhaseTwo_CommitFailed_Retryable;
}
}
- 执行分支事务提交方法时,首先判断 useTCCFence 属性是否为 true,如果为 true,则走 TCCFenceHandler 类中的 commitFence 逻辑,否则走普通提交逻辑
- TCCFenceHandler 类中的 commitFence 方法调用了 TCCFenceHandler 类的 commitFence 方法
public static boolean commitFence(Method commitMethod, Object targetTCCBean,
String xid, Long branchId, Object[] args) {
return transactionTemplate.execute(status -> {
try {
Connection conn = DataSourceUtils.getConnection(dataSource);
TCCFenceDO tccFenceDO = TCC_FENCE_DAO.queryTCCFenceDO(conn, xid, branchId);
if (tccFenceDO == null) {
throw new TCCFenceException(String.format("TCC fence record not exists, commit fence method failed. xid= %s, branchId= %s", xid, branchId),
FrameworkErrorCode.RecordAlreadyExists);
}
if (TCCFenceConstant.STATUS_COMMITTED == tccFenceDO.getStatus()) {
LOGGER.info("Branch transaction has already committed before. idempotency rejected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus());
return true;
}
if (TCCFenceConstant.STATUS_ROLLBACKED == tccFenceDO.getStatus() || TCCFenceConstant.STATUS_SUSPENDED == tccFenceDO.getStatus()) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Branch transaction status is unexpected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus());
}
return false;
}
return updateStatusAndInvokeTargetMethod(conn, commitMethod, targetTCCBean, xid, branchId, TCCFenceConstant.STATUS_COMMITTED, status, args);
} catch (Throwable t) {
status.setRollbackOnly();
throw new SkipCallbackWrapperException(t);
}
});
}
- 从代码中可以看到,提交事务时首先会判断 tcc_fence_log 表中是否已经有记录,如果有记录,则判断事务执行状态并返回。这样如果判断到事务的状态已经是 STATUS_COMMITTED,就不会再次提交,保证了幂等。如果 tcc_fence_log 表中没有记录,则插入一条记录,供后面重试时判断。Rollback 的逻辑跟 commit 类似,逻辑在类 TCCFenceHandler 的 rollbackFence 方法
防悬挂
- 补偿服务比原服务先执行
- 出现原因:原服务超时(拥堵), Saga事务回滚,触发回滚, 拥堵的原服务到达
- 解决方案: 要检查当前业务主键是否已经在空补偿记录下来的业务主键中存在,如果存在则要拒绝服务的执行
Seata处理方式
- 执行 Rollback 方法时先判断 tcc_fence_log 是否存在当前 xid 的记录,如果没有则向 tcc_fence_log 表插入一条记录,状态是 STATUS_SUSPENDED,并且不再执行回滚操作, 而后面执行 try 阶段方法时首先会向 tcc_fence_log 表插入一条当前 xid 的记录,这样就造成了主键冲突
空回滚
- 原服务未执行,补偿服务执行了
- 出现原因:原服务超时(丢包), Saga事务触发回滚, 未收到原服务请求,先收到补偿请求
- 解决方案: 服务设计时需要允许空补偿, 即没有找到要补偿的业务主键时返回补偿成功并将原业务主键记录下来
Seata处理方式
- Seata 的解决方案是在 try 阶段 往 tcc_fence_log 表插入一条记录,status 字段值是 STATUS_TRIED,在 Rollback 阶段判断记录是否存在,如果不存在,则不执行回滚操作
// TCCFenceHandler 类
public static Object prepareFence(String xid, Long branchId, String actionName, Callback<Object> targetCallback) {
return transactionTemplate.execute(status -> {
try {
Connection conn = DataSourceUtils.getConnection(dataSource);
boolean result = insertTCCFenceLog(conn, xid, branchId, actionName, TCCFenceConstant.STATUS_TRIED);
LOGGER.info("TCC fence prepare result: {}. xid: {}, branchId: {}", result, xid, branchId);
if (result) {
return targetCallback.execute();
} else {
throw new TCCFenceException(String.format("Insert tcc fence record error, prepare fence failed. xid= %s, branchId= %s", xid, branchId),
FrameworkErrorCode.InsertRecordError);
}
} catch (TCCFenceException e) {
//省略
} catch (Throwable t) {
//省略
}
});
}
- 回滚时, 把 tcc_fence_log 表记录的 status 字段值从 STATUS_TRIED 改为 STATUS_ROLLBACKED,如果更新成功,就执行回滚逻辑
//TCCFenceHandler 类
public static boolean rollbackFence(Method rollbackMethod, Object targetTCCBean,
String xid, Long branchId, Object[] args, String actionName) {
return transactionTemplate.execute(status -> {
try {
Connection conn = DataSourceUtils.getConnection(dataSource);
TCCFenceDO tccFenceDO = TCC_FENCE_DAO.queryTCCFenceDO(conn, xid, branchId);
// non_rollback
if (tccFenceDO == null) {
//不执行回滚逻辑
return true;
} else {
if (TCCFenceConstant.STATUS_ROLLBACKED == tccFenceDO.getStatus() || TCCFenceConstant.STATUS_SUSPENDED == tccFenceDO.getStatus()) {
LOGGER.info("Branch transaction had already rollbacked before, idempotency rejected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus());
return true;
}
if (TCCFenceConstant.STATUS_COMMITTED == tccFenceDO.getStatus()) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Branch transaction status is unexpected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus());
}
return false;
}
}
return updateStatusAndInvokeTargetMethod(conn, rollbackMethod, targetTCCBean, xid, branchId, TCCFenceConstant.STATUS_ROLLBACKED, status, args);
} catch (Throwable t) {
status.setRollbackOnly();
throw new SkipCallbackWrapperException(t);
}
});
}
事务分组
- 事务分组:seata的资源逻辑,可以按微服务的需要,在应用程序(客户端)对自行定义事务分组,每组取一个名字
- 集群:seata-server服务端一个或多个节点组成的集群cluster。 应用程序(客户端)使用时需要指定事务逻辑分组与Seata服务端集群的映射关系
- 事务分组如何找到后端Seata集群:
- 首先应用程序(客户端)中配置了事务分组(GlobalTransactionScanner 构造方法的txServiceGroup参数)。若应用程序是SpringBoot则通过seata.tx-service-group 配置
- 应用程序(客户端)会通过用户配置的配置中心去寻找service.vgroupMapping .[事务分组配置项],取得配置项的值就是TC集群的名称。若应用程序是SpringBoot则通过seata.service.vgroup-mapping.事务分组名=集群名称 配置
- 拿到集群名称程序通过一定的前后缀+集群名称去构造服务名,各配置中心的服务名实现不同(前提是Seata-Server已经完成服务注册,且Seata-Server向注册中心报告cluster名与应用程序(客户端)配置的集群名称一致)
- 拿到服务名去相应的注册中心去拉取相应服务名的服务列表,获得后端真实的TC服务列表(即Seata-Server集群节点列表)
高可用体现
-
TC的异地多机房容灾: projectA所有微服务的事务分组tx-service-group设置为:projectA,projectA正常情况下使用guangzhou的TC集群, 异常时可通过修改配置到Shanghai集群,当然也可以加个定时任务扫描集群健康,自动改配置(通过http接口改Nacos, Apollo等配置中心数据)
-
单一环境多应用接入: 不同应用不同集群
-
client的精细化控制: 冷热集群分离
- Seata的预发与生产隔离
Seata VS ServiceComb Package
SAGA实现的区别
- Seata基于状态机定义, 可以用可视化工具来定义业务流程,标准化,可读性高,可实现服务编排的功能,天然可以使用 Actor 模型或 SEDA 架构等异步处理引擎来执行,提高整体吞吐量, 但理解成本更高, 改造现有业务,对业务侵入性高, 引擎实现成本高
- ServiceComb Saga基于注解+拦截器实现,理解成本低, 方便接入现有业务,框架无法提供业务状态管理 ,以实现宕机恢复后的“向前重试”,因为无法恢复线程上下文
- ServiceComb在请求开始和请求结束都需要将状态发给服务端Alapha, 而Seata只需要开始时注册分支事务发送服务端TC,两者都是由业务侧全局事务开启处决定提交或者回滚,两者服务端有超时机制,防止业务侧宕机处理无法处理
- ServiceComb服务端是基于AKKA实现有限状态机
- 网络通信ServiceComb是基于GRPC, Seata是直接基于Netty
- 持久层ServiceComb是基于JPA,Seata直接Jdbc
- Seata灵活性高,可配置性强,可以自由选择注册中心,配置中心等,star多很多,活跃性高
参考文章
- 基于 Seata Saga 设计更有弹性的金融应用
- 分布式事务中间件 Seata 的设计原理
- 【分布式事务Seata源码解读二】Client端启动流程
- Seata应用侧启动过程剖析——RM & TM如何与TC建立连接
- 【分布式事务Seata源码解读一】Server端启动流程
- 深度剖析一站式分布式事务方案Seata-Server
- TCC 适用模型与适用场景分析
- TCC 理论及设计实现指南介绍
- 详解 Seata AT 模式事务隔离级别与全局锁设计
- seata官方文档
- seata官网之SAGA实现
- MySQL 支持的分布式事务 XA
- 分布式事务:XA(2PC、3PC)和TCC
- 转账是怎么保证数据一致性的
- Seata 在 1.5.1 版本解决了 TCC 模式的幂等、悬挂和空回滚问题