描述
seata是分布式事务解决方案。分布式事务是包含若干分支事务的全局事务。如果各分支事务提交成功,则全局事务提交。如果各分支有一个执行失败,则全局事务回滚。
分布式事务处理过程抽象为事务管理器(Transaction Manager)、分支事务(Resource Manager )、事务协调器(Transaction Coordinator)。
TC作为服务端部署,协调RM的提交或回滚。RM和TM集成作为客户端部署。如果有注册中心,则TC、RM、TM都需要向注册中心注册。
适合在MVC框架中,在service层方法上添加@GlobalTransactional
开启全局事务,然后service中用到的dao层是分支事务
测试环境
下面使用seata-samples中springboot-mybatis项目做测试,seata-simple该项目依赖很多,可以将不需要的子项目删除再测试。还需要下载seata-server。
初始化
初始化DataSourceProxy
中,缓存DataSourceProxy,生成ResourceId并向TC注册资源。
一阶段
TM
在方法上添加注解@GlobalTransactional
开启分布式事务。该注解由GlobalTransactionalInterceptor
拦截处理。由下图可知TM处理流程:
1.开启全局事务,向TC注册全局事务并返回XID
2.如果业务执行成功,通知TC全局事务提交
3.如果业务执行失败,通知TC全局事务回滚
4.清除内存中XID
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. get or create a transaction
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
// 1.1 get transactionInfo
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
try {
// 2. begin transaction
beginTransaction(txInfo, tx);
Object rs = null;
try {
// Do Your Business
rs = business.execute();
} catch (Throwable ex) {
// 3.the needed business exception to rollback.
completeTransactionAfterThrowing(txInfo,tx,ex);
throw ex;
}
// 4. everything is fine, commit.
commitTransaction(tx);
return rs;
} finally {
//5. clear
triggerAfterCompletion();
cleanUp();
}
}
DefaultGlobalTransaction
定义了提交、回滚方法。执行操作的角色有发起者(Launcher)、参与者(Participant)。只有发起者才可以执行begin、commit、rollback。
RM
1.向TC注册分支事务
2.插入回滚日志
3.提交本地事务
4.通知TC本地事务执行状态
5.清空内存中XID、branchId
AT模式主要封装了数据库操作。DataSource、Connection、Statement封装为DataSourceProxy、ConnectionProxy、StatementProxy。
执行sql时,ConnectionProxy创建Statement包装类StatementProxy对象。StatementProxy创建Executor。Executor中解析目标sql,再创建beforeImage,执行sql语句、创建afterImage。最后ConnectionProxy提交本地事务。
protected T executeAutoCommitTrue(Object[] args) throws Throwable {
AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
try {
// 设置手动提交
connectionProxy.setAutoCommit(false);
return new LockRetryPolicy(connectionProxy.getTargetConnection()).execute(() -> {
// 创建beforeImage、执行sql、创建afterImage
T result = executeAutoCommitFalse(args);
// 提交本地事务
connectionProxy.commit();
return result;
});
} catch (Exception e) {
// when exception occur in finally,this exception will lost, so just print it here
LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
connectionProxy.getTargetConnection().rollback();
}
throw e;
} finally {
((ConnectionProxy) connectionProxy).getContext().reset();
connectionProxy.setAutoCommit(true);
}
}
protected T executeAutoCommitFalse(Object[] args) throws Exception {
TableRecords beforeImage = beforeImage();
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
TableRecords afterImage = afterImage(beforeImage);
prepareUndoLog(beforeImage, afterImage);
return result;
}
private void processGlobalTransactionCommit() throws SQLException {
try {
// RM向TC注册分支事务,获取branchId
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
if (context.hasUndoLog()) {
// 插入数据库回滚日志
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
}
// 提交本地事务
targetConnection.commit();
} catch (Throwable ex) {
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
// RM向TC报告本地事务提交失败
report(false);
throw new SQLException(ex);
}
// RM向TC报告本地事务提交成功
report(true);
// 清空XID、branchId
context.reset();
}
二阶段
TM通知TC全局事务提交/回滚。TC通知各RM提交/回滚本地事务。
RM以RMHandlerAT
来处理TC的事务提交通知。提交全局事务时,RM将删除undo log。先将删除操作封装为任务放入AsyncWorker
中的阻塞队列中,并返回TC成功消息。AsyncWorker
中的定时器每隔1s执行删除任务。
回滚时,RM先获取undo log回滚数据,然后删除undo log。逻辑在AbstractUndoLogManager
中undo()
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
Connection conn = null;
ResultSet rs = null;
PreparedStatement selectPST = null;
boolean originalAutoCommit = true;
for (; ; ) {
try {
// 获取源数据库链接
conn = dataSourceProxy.getPlainConnection();
// The entire undo process should run in a local transaction.
if (originalAutoCommit = conn.getAutoCommit()) {
conn.setAutoCommit(false);
}
// Find UNDO LOG
selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
selectPST.setLong(1, branchId);
selectPST.setString(2, xid);
rs = selectPST.executeQuery();
boolean exists = false;
while (rs.next()) {
exists = true;
// It is possible that the server repeatedly sends a rollback request to roll back
// the same branch transaction to multiple processes,
// ensuring that only the undo_log in the normal state is processed.
int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
if (!canUndo(state)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, ignore {} undo_log",
xid, branchId, state);
}
return;
}
String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
Map<String, String> context = parseContext(contextString);
Blob b = rs.getBlob(ClientTableColumnsName.UNDO_LOG_ROLLBACK_INFO);
byte[] rollbackInfo = BlobUtils.blob2Bytes(b);
String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance() :
UndoLogParserFactory.getInstance(serializer);
BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);
try {
// put serializer name to local
setCurrentSerializer(parser.getName());
List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
if (sqlUndoLogs.size() > 1) {
Collections.reverse(sqlUndoLogs);
}
for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy).getTableMeta(dataSourceProxy, sqlUndoLog.getTableName());
sqlUndoLog.setTableMeta(tableMeta);
AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
dataSourceProxy.getDbType(),
sqlUndoLog);
// 回滚数据
undoExecutor.executeOn(conn);
}
} finally {
// remove serializer name
removeCurrentSerializer();
}
}
// If undo_log exists, it means that the branch transaction has completed the first phase,
// we can directly roll back and clean the undo_log
// Otherwise, it indicates that there is an exception in the branch transaction,
// causing undo_log not to be written to the database.
// For example, the business processing timeout, the global transaction is the initiator rolls back.
// To ensure data consistency, we can insert an undo_log with GlobalFinished state
// to prevent the local transaction of the first phase of other programs from being correctly submitted.
// See https://github.com/seata/seata/issues/489
if (exists) {
// 删除undo log
deleteUndoLog(xid, branchId, conn);
conn.commit();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log deleted with {}",
xid, branchId, State.GlobalFinished.name());
}
} else {
// 插入undo log,防止一阶段重复写入
insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
conn.commit();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log added with {}",
xid, branchId, State.GlobalFinished.name());
}
}
return;
}
}
}
隔离界别
写隔离
一阶段本地事务提交前要先获取全局锁,获取到才可提交。获取全局锁被限定在一定时间范围内。
比如tx1和tx2两个事务更新数据场景。tx1先获取本地锁执行更新,然后获取全局锁去提交本地事务,本地事务提交后释放本地锁。此时全局锁还是tx1。tx2更新时,无法获取全局锁,也就无法提交本地事务。
如果tx1提交,则释放全局锁,tx2才可以执行。
如果tx1回滚,需要获取本地锁执行补偿操作,而tx2拥有本地锁。所以tx1本地事务回滚失败,并一直重试。直到tx2获取全局锁超时,释放本地锁为止。
读隔离
默认分支事务是读已提交级别或之上,全局事务是读未提交级别。如果应用需要全局隔离级别到读已提交,则是通过SELECT FOR UPDATE代理语句实现。
SELECT FOR UPDATE执行时会先获取全局锁,而也会获取数据库拍他锁。如果获取失败,就释放本地锁,然后重试。这时查询是阻塞的。直到获取全局锁。
引用
https://my.oschina.net/u/1464083/blog/3040896#h2_8
https://www.sofastack.tech/blog/seata-distributed-transaction-deep-dive/
https://mp.weixin.qq.com/s/EzmZ-DAi-hxJhRkFvFhlJQ
http://seata.io/zh-cn/docs/overview/what-is-seata.html
https://github.com/javagrowing/JGrowing/tree/master/%E5%88%86%E5%B8%83%E5%BC%8F/%E5%88%86%E5%B8%83%E5%BC%8F%E4%BA%8B%E5%8A%A1