分布式事务框架FESCAR执行过程-AT

本文主要从代码层面解析FESCAR的执行过程,原理架构的图文解析可以见github 项目主页https://github.com/alibaba/fescar/wiki/%E9%98%BF%E9%87%8C%E5%B7%B4%E5%B7%B4%E5%BC%80%E6%BA%90%E5%88%86%E5%B8%83%E5%BC%8F%E4%BA%8B%E5%8A%A1%E8%A7%A3%E5%86%B3%E6%96%B9%E6%A1%88-FESCAR
源代码可以见https://github.com/alibaba/fescar

以git主页上面的quickstart开始分析(purchase包含若干分布式的事务service),号称一个注解即可搞定分布式事务

We just need an annotation @GlobalTransactional on business method:

 @GlobalTransactional
    public void purchase(String userId, String commodityCode, int orderCount) {
        ......
    }

普通场景,我们使用spring管理本地事务使用@Transactional注解,FESCAR使用的是@GlobalTransactional注解,那么我们就从@GlobalTransactional注解入手 ,显然这个注解通过SpringAop起作用,可以看GlobalTransactionalInterceptor代码

invoke方法中,其实真正的关键就是transactionalTemplate.execute();
也就是说,加了@GlobalTransactional注解的方法,会在这个transactionalTemplate.execute()方法中执行真正的业务方法

@Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
        final GlobalTransactional anno = getAnnotation(methodInvocation.getMethod());
        if (anno != null) {
            try {
                return transactionalTemplate.execute(new TransactionalExecutor() {
                    @Override
                    public Object execute() throws Throwable {
                        return methodInvocation.proceed();
                    }

                    @Override
                    public int timeout() {
                        return anno.timeoutMills();
                    }

                    @Override
                    public String name() {
                        if (anno.name() != null) {
                            return anno.name();
                        }
                        return formatMethod(methodInvocation.getMethod());
                    }
                });
            } catch (TransactionalExecutor.ExecutionException e) { }
        return methodInvocation.proceed();
    }

逻辑比较清楚,加了这个@GlobalTransactional注解的方法,通过transactionalTemplate.execute来执行。那么transactionalTemplate.execute()到底做了什么呢?

见TransactionalTemplate类


 public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {

       // 1. get or create a transaction
       GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

       // 2. begin transaction
       try {
           tx.begin(business.timeout(), business.name());

       } catch (TransactionException txe) {
           throw new TransactionalExecutor.ExecutionException(tx, txe,
               TransactionalExecutor.Code.BeginFailure);

       }

       Object rs = null;
       try {

           // Do Your Business
           rs = business.execute();

       } catch (Throwable ex) {

           // 3. any business exception, rollback.
           try {
               tx.rollback();

               // 3.1 Successfully rolled back
               throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);

           } catch (TransactionException txe) {
               // 3.2 Failed to rollback
               throw new TransactionalExecutor.ExecutionException(tx, txe,
                   TransactionalExecutor.Code.RollbackFailure, ex);

           }

       }

       // 4. everything is fine, commit.
       try {
           tx.commit();

       } catch (TransactionException txe) {
           // 4.1 Failed to commit
           throw new TransactionalExecutor.ExecutionException(tx, txe,
               TransactionalExecutor.Code.CommitFailure);

       }
       return rs;
   }

}

这个框架注释不是很多,但是这个类是例外,1,2,3,4,5很清楚。
简单说就是
1.获取或者创建一个GlobalTransaction;
2.开始事务
3.执行分布式方法,
4.有异常回滚
5.无异常提交。

下面分别说一下commit和rollback

此处的commit实质上什么也没做,只是维护一下GlobalSession中的分支状态,可以认为是个假的commit,提交这个动作其实第三步execute方法执行分支事务方法的时候就已经做了,也就是说,分布式事务在各自执行的时候就已经提交了,感兴趣的可以看代码DefaultCore#doGlobalCommit,以quickstart为例,在分支方法中打断点,可以看到,断点之前的其他分支方法里面已经写了DB且commit了。

那么如果某一个分支事务执行失败,其他已经提交了的分支事务还怎么回滚呢?看下面
rollback的流程,可以看DefaultCore.doGlobalRollback方法
其中最关键的就是UndoLogManager类,代码很长,感兴趣的自己研究,具体就是,这个undo,其实就是通过undolog来反向生成一个回滚sql,然后执行这个回滚sql来达到rollback的效果。

public static void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
        assertDbSupport(dataSourceProxy.getTargetDataSource().getDbType());

        Connection conn = null;
        ResultSet rs = null;
        PreparedStatement selectPST = null;
        try {
            conn = dataSourceProxy.getPlainConnection();

            // The entire undo process should run in a local transaction.
            conn.setAutoCommit(false);

            // Find UNDO LOG
            selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
            selectPST.setLong(1, branchId);
            selectPST.setString(2, xid);
            rs = selectPST.executeQuery();

            while (rs.next()) {
                Blob b = rs.getBlob("rollback_info");
                String rollbackInfo = StringUtils.blob2string(b);
                BranchUndoLog branchUndoLog = UndoLogParserFactory.getInstance().decode(rollbackInfo);

                for (SQLUndoLog sqlUndoLog : branchUndoLog.getSqlUndoLogs()) {
                    TableMeta tableMeta = TableMetaCache.getTableMeta(dataSourceProxy, sqlUndoLog.getTableName());
                    sqlUndoLog.setTableMeta(tableMeta);
                    AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(), sqlUndoLog);
                    undoExecutor.executeOn(conn);
                }

            }
            deleteUndoLog(xid, branchId, conn);

            conn.commit();

        } catch (Throwable e) {
            if (conn != null) {
                try {
                    conn.rollback();
                } catch (SQLException rollbackEx) {
                    LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);
                }
            }
            throw new TransactionException(BranchRollbackFailed_Retriable, String.format("%s/%s", branchId, xid), e);

        } finally {
            try {
                if (rs != null) {
                    rs.close();
                }
                if (selectPST != null) {
                    selectPST.close();
                }
                if (conn != null) {
                    conn.close();
                }
            } catch (SQLException closeEx) {
                LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx);
            }
        }

    }

那么这个回滚sql是如何生成的呢,我们以insert的undo为例,可以看到,其实就是通过生成一条delete语句来进行回滚。
响应的,delete的事务,回滚就是insert一条。

public class MySQLUndoInsertExecutor extends AbstractUndoExecutor {

    @Override
    protected String buildUndoSQL() {
        TableRecords afterImage = sqlUndoLog.getAfterImage();
        List<Row> afterImageRows = afterImage.getRows();
        if (afterImageRows == null || afterImageRows.size() == 0) {
            throw new ShouldNeverHappenException("Invalid UNDO LOG");
        }
        Row row = afterImageRows.get(0);
        StringBuffer mainSQL = new StringBuffer("DELETE FROM " + sqlUndoLog.getTableName());
        StringBuffer where = new StringBuffer(" WHERE ");
        boolean first = true;
        for (Field field : row.getFields()) {
            if (field.getKeyType() == KeyType.PrimaryKey) {
                where.append(field.getName() + " = ? ");
            }

        }
        return mainSQL.append(where).toString();
    }

总结一下,FESCAR其实就是说,把分布式事务当作一批branch本地事务来执行,branch各自执行,各自提交,假如所有branch都成功,那么commit的时候,维护一个状态即可,因为大家已经提交了;假如某一个branch执行失败,那么进行回滚,回滚的方式是根据之前的undolog生成一个反向的回滚sql,各个branch分别执行自己的回滚sql来达到回滚的效果。
这与XA的两阶段提交相比,优势在哪呢?1.不需要数据库支持XA协议,因为这个proxy是在应用层面。限制更少。2.减少了事务持锁时间,从而提高了事务的并发度

问题点:由于追求尽量少的持锁时间,那就需要考虑回滚场景下,分支事务提交到回滚sql执行之间是有时间差的,这个时间区间内的数据状态可以认为是一种脏数据。对于这种数据的读写都需要注意,读会产生“脏读”,写需要加乐观锁,不然会产生丢失更新。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,634评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,951评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,427评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,770评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,835评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,799评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,768评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,544评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,979评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,271评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,427评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,121评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,756评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,375评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,579评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,410评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,315评论 2 352

推荐阅读更多精彩内容