seata AT模式

描述

seata是分布式事务解决方案。分布式事务是包含若干分支事务的全局事务。如果各分支事务提交成功,则全局事务提交。如果各分支有一个执行失败,则全局事务回滚。
分布式事务处理过程抽象为事务管理器(Transaction Manager)、分支事务(Resource Manager )、事务协调器(Transaction Coordinator)。
TC作为服务端部署,协调RM的提交或回滚。RM和TM集成作为客户端部署。如果有注册中心,则TC、RM、TM都需要向注册中心注册。

适合在MVC框架中,在service层方法上添加@GlobalTransactional开启全局事务,然后service中用到的dao层是分支事务

测试环境

下面使用seata-samples中springboot-mybatis项目做测试,seata-simple该项目依赖很多,可以将不需要的子项目删除再测试。还需要下载seata-server

初始化

初始化DataSourceProxy中,缓存DataSourceProxy,生成ResourceId并向TC注册资源。

一阶段

TM

在方法上添加注解@GlobalTransactional开启分布式事务。该注解由GlobalTransactionalInterceptor拦截处理。由下图可知TM处理流程:
1.开启全局事务,向TC注册全局事务并返回XID
2.如果业务执行成功,通知TC全局事务提交
3.如果业务执行失败,通知TC全局事务回滚
4.清除内存中XID

public Object execute(TransactionalExecutor business) throws Throwable {
        // 1. get or create a transaction
        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

        // 1.1 get transactionInfo
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        try {

            // 2. begin transaction
            beginTransaction(txInfo, tx);

            Object rs = null;
            try {

                // Do Your Business
                rs = business.execute();

            } catch (Throwable ex) {

                // 3.the needed business exception to rollback.
                completeTransactionAfterThrowing(txInfo,tx,ex);
                throw ex;
            }

            // 4. everything is fine, commit.
            commitTransaction(tx);

            return rs;
        } finally {
            //5. clear
            triggerAfterCompletion();
            cleanUp();
        }
    }

DefaultGlobalTransaction定义了提交、回滚方法。执行操作的角色有发起者(Launcher)、参与者(Participant)。只有发起者才可以执行begin、commit、rollback。

RM

1.向TC注册分支事务
2.插入回滚日志
3.提交本地事务
4.通知TC本地事务执行状态
5.清空内存中XID、branchId

AT模式主要封装了数据库操作。DataSource、Connection、Statement封装为DataSourceProxy、ConnectionProxy、StatementProxy。

执行sql时,ConnectionProxy创建Statement包装类StatementProxy对象。StatementProxy创建Executor。Executor中解析目标sql,再创建beforeImage,执行sql语句、创建afterImage。最后ConnectionProxy提交本地事务。

protected T executeAutoCommitTrue(Object[] args) throws Throwable {
    AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    try {
        // 设置手动提交
        connectionProxy.setAutoCommit(false);
        return new LockRetryPolicy(connectionProxy.getTargetConnection()).execute(() -> {
            // 创建beforeImage、执行sql、创建afterImage
            T result = executeAutoCommitFalse(args);
            // 提交本地事务
            connectionProxy.commit();
            return result;
        });
    } catch (Exception e) {
        // when exception occur in finally,this exception will lost, so just print it here
        LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
        if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
            connectionProxy.getTargetConnection().rollback();
        }
        throw e;
    } finally {
        ((ConnectionProxy) connectionProxy).getContext().reset();
        connectionProxy.setAutoCommit(true);
    }
}


protected T executeAutoCommitFalse(Object[] args) throws Exception {
    TableRecords beforeImage = beforeImage();
    T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
    TableRecords afterImage = afterImage(beforeImage);
    prepareUndoLog(beforeImage, afterImage);
    return result;
}
private void processGlobalTransactionCommit() throws SQLException {
    try {
        // RM向TC注册分支事务,获取branchId
        register();
    } catch (TransactionException e) {
        recognizeLockKeyConflictException(e, context.buildLockKeys());
    }

    try {
        if (context.hasUndoLog()) {
            // 插入数据库回滚日志
            UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
        }
        // 提交本地事务
        targetConnection.commit();
    } catch (Throwable ex) {
        LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
        // RM向TC报告本地事务提交失败
        report(false);
        throw new SQLException(ex);
    }
    // RM向TC报告本地事务提交成功
    report(true);
    // 清空XID、branchId
    context.reset();
}

二阶段

TM通知TC全局事务提交/回滚。TC通知各RM提交/回滚本地事务。
RM以RMHandlerAT来处理TC的事务提交通知。提交全局事务时,RM将删除undo log。先将删除操作封装为任务放入AsyncWorker中的阻塞队列中,并返回TC成功消息。AsyncWorker中的定时器每隔1s执行删除任务。

回滚时,RM先获取undo log回滚数据,然后删除undo log。逻辑在AbstractUndoLogManager中undo()



public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
    Connection conn = null;
    ResultSet rs = null;
    PreparedStatement selectPST = null;
    boolean originalAutoCommit = true;

    for (; ; ) {
        try {
            // 获取源数据库链接
            conn = dataSourceProxy.getPlainConnection();

            // The entire undo process should run in a local transaction.
            if (originalAutoCommit = conn.getAutoCommit()) {
                conn.setAutoCommit(false);
            }

            // Find UNDO LOG
            selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
            selectPST.setLong(1, branchId);
            selectPST.setString(2, xid);
            rs = selectPST.executeQuery();

            boolean exists = false;
            while (rs.next()) {
                exists = true;

                // It is possible that the server repeatedly sends a rollback request to roll back
                // the same branch transaction to multiple processes,
                // ensuring that only the undo_log in the normal state is processed.
                int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
                if (!canUndo(state)) {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("xid {} branch {}, ignore {} undo_log",
                                xid, branchId, state);
                    }
                    return;
                }

                String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
                Map<String, String> context = parseContext(contextString);
                Blob b = rs.getBlob(ClientTableColumnsName.UNDO_LOG_ROLLBACK_INFO);
                byte[] rollbackInfo = BlobUtils.blob2Bytes(b);

                String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
                UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance() :
                        UndoLogParserFactory.getInstance(serializer);
                BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);

                try {
                    // put serializer name to local
                    setCurrentSerializer(parser.getName());
                    List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
                    if (sqlUndoLogs.size() > 1) {
                        Collections.reverse(sqlUndoLogs);
                    }
                    for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
                        TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy).getTableMeta(dataSourceProxy, sqlUndoLog.getTableName());
                        sqlUndoLog.setTableMeta(tableMeta);
                        AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                                dataSourceProxy.getDbType(),
                                sqlUndoLog);
                        // 回滚数据
                        undoExecutor.executeOn(conn);
                    }
                } finally {
                    // remove serializer name
                    removeCurrentSerializer();
                }
            }

            // If undo_log exists, it means that the branch transaction has completed the first phase,
            // we can directly roll back and clean the undo_log
            // Otherwise, it indicates that there is an exception in the branch transaction,
            // causing undo_log not to be written to the database.
            // For example, the business processing timeout, the global transaction is the initiator rolls back.
            // To ensure data consistency, we can insert an undo_log with GlobalFinished state
            // to prevent the local transaction of the first phase of other programs from being correctly submitted.
            // See https://github.com/seata/seata/issues/489

            if (exists) {
                // 删除undo log
                deleteUndoLog(xid, branchId, conn);
                conn.commit();
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("xid {} branch {}, undo_log deleted with {}",
                            xid, branchId, State.GlobalFinished.name());
                }
            } else {
                // 插入undo log,防止一阶段重复写入
                insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
                conn.commit();
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("xid {} branch {}, undo_log added with {}",
                            xid, branchId, State.GlobalFinished.name());
                }
            }

            return;
        }
    }
}

隔离界别

写隔离

一阶段本地事务提交前要先获取全局锁,获取到才可提交。获取全局锁被限定在一定时间范围内。
比如tx1和tx2两个事务更新数据场景。tx1先获取本地锁执行更新,然后获取全局锁去提交本地事务,本地事务提交后释放本地锁。此时全局锁还是tx1。tx2更新时,无法获取全局锁,也就无法提交本地事务。

如果tx1提交,则释放全局锁,tx2才可以执行。
如果tx1回滚,需要获取本地锁执行补偿操作,而tx2拥有本地锁。所以tx1本地事务回滚失败,并一直重试。直到tx2获取全局锁超时,释放本地锁为止。

读隔离

默认分支事务是读已提交级别或之上,全局事务是读未提交级别。如果应用需要全局隔离级别到读已提交,则是通过SELECT FOR UPDATE代理语句实现。
SELECT FOR UPDATE执行时会先获取全局锁,而也会获取数据库拍他锁。如果获取失败,就释放本地锁,然后重试。这时查询是阻塞的。直到获取全局锁。

引用

https://my.oschina.net/u/1464083/blog/3040896#h2_8
https://www.sofastack.tech/blog/seata-distributed-transaction-deep-dive/
https://mp.weixin.qq.com/s/EzmZ-DAi-hxJhRkFvFhlJQ
http://seata.io/zh-cn/docs/overview/what-is-seata.html
https://github.com/javagrowing/JGrowing/tree/master/%E5%88%86%E5%B8%83%E5%BC%8F/%E5%88%86%E5%B8%83%E5%BC%8F%E4%BA%8B%E5%8A%A1

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,287评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,346评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,277评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,132评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,147评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,106评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,019评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,862评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,301评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,521评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,682评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,405评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,996评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,651评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,803评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,674评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,563评论 2 352

推荐阅读更多精彩内容