分布式事务中间件 seata - RM 模块源码解读

上一篇文章,我们介绍了新一代分布式事务的 seata 的实现机制,它是基于两阶段提交模式设计的,以高效且对业务零侵入的方式,解决微服务场景下面临的分布式事务问题。我们重新温故下 seata 三大组件 TC、TM、RM的交互流程:

本文将深入到 seata 的 RM 模块源码去介绍 seata 是如何在完成分支提交和回滚的基础上又做到零侵入,进而极大方便业务方进行业务系统开发。

从配置开始解读

/**
 * The type Druid configuration.
 */
@Configuration
public class DruidConfiguration {

    @Value("${spring.datasource.druid.user}")
    private String druidUser;

    @Value("${spring.datasource.druid.password}")
    private String druidPassword;

    /**
     * Druid data source druid data source.
     *
     * @return the druid data source
     */
    @Bean(destroyMethod = "close", initMethod = "init")
    @ConfigurationProperties(prefix = "spring.datasource")
    public DruidDataSource druidDataSource() {
        DruidDataSource druidDataSource = new DruidDataSource();
        return druidDataSource;
    }

    /**
     * Data source data source.
     *
     * @param druidDataSource the druid data source
     * @return the data source
     */
    @Primary
    @Bean("dataSource")
    public DataSource dataSource(DruidDataSource druidDataSource) {
        DataSourceProxy dataSourceProxy = new DataSourceProxy(druidDataSource);
        return dataSourceProxy;
    }
    ...
}

上面是 seata 数据源的配置,数据源采用 druid 的DruidDataSource,但实际 jdbcTemplate 执行时并不是用该数据源,而用的是 seata 对DruidDataSource的代理DataSourceProxy,所以,与 RM 相关的代码逻辑基本上都是从DataSourceProxy这个代理数据源开始的。

seata 采用 2PC 来完成分支事务的提交与回滚,具体怎么做到的呢,下面就分别介绍 Phase1、Phase2 具体做了些什么。

Phase1 - 分支(本地)事务执行

seata 将一个本地事务做为一个分布式事务分支,所以若干个分布在不同微服务中的本地事务共同组成了一个全局事务,结构如下:


那么,一个本地事务中 SQL 是如何执行呢?在 Spring 中,本质上都是从 jdbcTemplate 开始的,一般JdbcTemplate执行流程如下图所示:


由于在配置中,JdbcTemplate 数据源DruidDataSource被配置成了 seata 实现DataSourceProxy,进而控制了后续的流程。数据库连接使用的是seata 提供的ConnectionProxy,Statement 使用的是 seata 实现的StatementProxy,最终 seata 就顺理成章地实现了在本地事务执行前后增加所需要的逻辑,比如:完成分支事务的快照记录和分支事务执行状态的上报等等。

DataSourceProxy获取ConnectionProxy:

public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {

    @Override
    public ConnectionProxy getConnection() throws SQLException {
        Connection targetConnection = targetDataSource.getConnection();
        return new ConnectionProxy(this, targetConnection);
    }

    @Override
    public ConnectionProxy getConnection(String username, String password) throws SQLException {
        Connection targetConnection = targetDataSource.getConnection(username, password);
        return new ConnectionProxy(this, targetConnection);
    }
}

ConnectionProxy获取StatmentProxy

public abstract class AbstractConnectionProxy implements Connection {
  
    @Override
    public PreparedStatement prepareStatement(String sql) throws SQLException {
        PreparedStatement targetPreparedStatement = getTargetConnection().prepareStatement(sql);
        return new PreparedStatementProxy(this, targetPreparedStatement, sql);
    }

}

在获取到StatementProxy后,可以调用excute方法执行SQL了:

public class PreparedStatementProxy extends AbstractPreparedStatementProxy
    implements PreparedStatement, ParametersHolder {

     @Override
    public ArrayList<Object>[] getParameters() {
        return parameters;
    }

    private void init() throws SQLException {
        int paramCount = targetStatement.getParameterMetaData().getParameterCount();
        this.parameters = new ArrayList[paramCount];
        for (int i = 0; i < paramCount; i++) {
            parameters[i] = new ArrayList<>();
        }
    }

    /**
     * Instantiates a new Prepared statement proxy.
     *
     * @param connectionProxy the connection proxy
     * @param targetStatement the target statement
     * @param targetSQL       the target sql
     * @throws SQLException the sql exception
     */
    public PreparedStatementProxy(AbstractConnectionProxy connectionProxy, PreparedStatement targetStatement,
                                  String targetSQL) throws SQLException {
        super(connectionProxy, targetStatement, targetSQL);
        init();
    }

    @Override
    public boolean execute() throws SQLException {
        return ExecuteTemplate.execute(this, new StatementCallback<Boolean, PreparedStatement>() {
            @Override
            public Boolean execute(PreparedStatement statement, Object... args) throws SQLException {
                return statement.execute();
            }
        });
    }

    @Override
    public ResultSet executeQuery() throws SQLException {
        return ExecuteTemplate.execute(this, new StatementCallback<ResultSet, PreparedStatement>() {
            @Override
            public ResultSet execute(PreparedStatement statement, Object... args) throws SQLException {
                return statement.executeQuery();
            }
        });
    }

    @Override
    public int executeUpdate() throws SQLException {
        return ExecuteTemplate.execute(this, new StatementCallback<Integer, PreparedStatement>() {
            @Override
            public Integer execute(PreparedStatement statement, Object... args) throws SQLException {
                return statement.executeUpdate();
            }
        });
    }
}

而真正excute实现逻辑如下:

public class ExecuteTemplate {

    ...
    public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
                                                     StatementProxy<S> statementProxy,
                                                     StatementCallback<T, S> statementCallback,
                                                     Object... args) throws SQLException {

        if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) {
            // Just work as original statement
            return statementCallback.execute(statementProxy.getTargetStatement(), args);
        }

        if (sqlRecognizer == null) {
            sqlRecognizer = SQLVisitorFactory.get(
                    statementProxy.getTargetSQL(),
                    statementProxy.getConnectionProxy().getDbType());
        }
        Executor<T> executor = null;
        if (sqlRecognizer == null) {
            executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
        } else {
            switch (sqlRecognizer.getSQLType()) {
                case INSERT:
                    executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case UPDATE:
                    executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case DELETE:
                    executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case SELECT_FOR_UPDATE:
                    executor = new SelectForUpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                default:
                    executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
                    break;
            }
        }
        T rs = null;
        try {
            rs = executor.execute(args);
        } catch (Throwable ex) {
            if (!(ex instanceof SQLException)) {
                // Turn other exception into SQLException
                ex = new SQLException(ex);
            }
            throw (SQLException)ex;
        }
        return rs;
    }
}
  1. 首先会检查当前本地事务是否处于全局事务中,如果不处于,直接使用默认的Statment执行,避免因引入 seata 导致非全局事务中的 SQL 执行性能下降。

2.根据 SQL 语句和数据库类型,获取 SQL 识别器SQLRecognizer

  1. 对于 INSERT、UPDATE、DELETE、SELECT..FOR UPDATE 这四种类型的 SQL 会使用专门实现的 SQL 执行器进行处理,其它 SQL (如查询)直接是默认的PlainExecutor执行。

  2. 返回执行结果,如有异常则直接抛给上层业务代码进行处理。

再来看一下关键的 INSERT、UPDATE、DELETE、SELECT..FOR UPDATE 这四种类型的 sql 如何执行的,先看一下具体类图结构:


为节省篇幅,选择UpdateExecutor实现源码看一下,先看入口BaseTransactionalExecutor.execute,该方法将ConnectionProxy与Xid(事务ID)进行绑定,这样后续判断当前本地事务是否处理全局事务中只需要看ConnectionProxy中 Xid 是否为空就行。

public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor {

  /**
     * The Statement proxy.
     */
    protected StatementProxy<S> statementProxy;

    ...
    @Override
    public Object execute(Object... args) throws Throwable {
        if (RootContext.inGlobalTransaction()) {
            String xid = RootContext.getXID();
            statementProxy.getConnectionProxy().bind(xid);
        }

        if (RootContext.requireGlobalLock()) {
            statementProxy.getConnectionProxy().setGlobalLockRequire(true);
        } else {
            statementProxy.getConnectionProxy().setGlobalLockRequire(false);
        }
        return doExecute(args);
    }
   
}

然后,执行AbstractDMLBaseExecutor中实现的doExecute方法:

public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {

    @Override
    public T doExecute(Object... args) throws Throwable {
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        if (connectionProxy.getAutoCommit()) {
            return executeAutoCommitTrue(args);
        } else {
            return executeAutoCommitFalse(args);
        }
    }

    /**
     * Execute auto commit false t.
     *
     * @param args the args
     * @return the t
     * @throws Throwable the throwable
     */
    protected T executeAutoCommitFalse(Object[] args) throws Throwable {
        TableRecords beforeImage = beforeImage();
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        TableRecords afterImage = afterImage(beforeImage);
        prepareUndoLog(beforeImage, afterImage);
        return result;
    }

    /**
     * Execute auto commit true t.
     *
     * @param args the args
     * @return the t
     * @throws Throwable the throwable
     */
    protected T executeAutoCommitTrue(Object[] args) throws Throwable {
        T result = null;
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        LockRetryController lockRetryController = new LockRetryController();
        try {
            connectionProxy.setAutoCommit(false);
            while (true) {
                try {
                    result = executeAutoCommitFalse(args);
                    connectionProxy.commit();
                    break;
                } catch (LockConflictException lockConflict) {
                    connectionProxy.getTargetConnection().rollback();
                    lockRetryController.sleep(lockConflict);
                }
            }

        } catch (Exception e) {
            // when exception occur in finally,this exception will lost, so just print it here
            LOGGER.error("exception occur", e);
            throw e;
        } finally {
            connectionProxy.setAutoCommit(true);
        }
        return result;
    }

}

基本逻辑如下:

  1. 先判断是否为 Auto-Commit 模式
  2. 如果非 Auto-Commit 模式,则先查询 Update 前对应行记录的快照beforeImage,再执行 Update 语句,完成后再查询 Update 后对应行记录的快照afterImage,最后将beforeImageafterImage生成 UndoLog 追加到 Connection 上下文ConnectionContext中。(注:获取beforeImageafterImage方法在UpdateExecutor类下,一般是构造一条 Select...For Update 语句获取执行前后的行记录)。
  3. 如果是 Auto-Commit 模式,先将提交模式设置成非自动 Commit,再执行2中的逻辑,再执行connectionProxy.commit()方法,由于执行2过程和commit时都可能会出现全局锁冲突问题,增加了一个循环等待重试逻辑,最后将 connection 的模式设置成 Auto-Commit 模式。

如果本地事务执行过程中发生异常,业务上层会接收到该异常,至于是给 TM 模块返回成功还是失败,由业务上层实现决定,如果返回失败,则 TM 裁决对全局事务进行回滚;如果本地事务执行过程未发生异常,不管是非 Auto-Commit 还是 Auto-Commit 模式,最后都会调用connectionProxy.commit()对本地事务进行提交,在这里会创建分支事务、上报分支事务的状态以及将 UndoLog 持久化到undo_log表中,具体代码如下图:

    @Override
    public void commit() throws SQLException {
        if (context.inGlobalTransaction()) {
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {
            processLocalCommitWithGlobalLocks();
        } else {
            targetConnection.commit();
        }
    }

    private void processGlobalTransactionCommit() throws SQLException {
        try {
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e);
        }

        try {
            if (context.hasUndoLog()) {
                UndoLogManager.flushUndoLogs(this);
            }
            targetConnection.commit();
        } catch (Throwable ex) {
            report(false);
            if (ex instanceof SQLException) {
                throw new SQLException(ex);
            }
        }
        report(true);
        context.reset();
    }

基本逻辑:

  1. 判断当前本地事务是否处于全局事务中(也就判断ConnectionContext中的 xid 是否为空)。
  2. 如果不处于全局事务中,但是有全局事务锁(即方法标注了@GlobalLock注解),则在全局事务锁中提交本地事务。
  3. 以上情况都不是,则调用targetConnection对本地事务进行 commit。
  4. 如果处于全局事务中,首先创建分支事务,再将ConnectionContext中的 UndoLog 写入到undo_log表中,然后调用targetConnection对本地事务进行 commit,将 UndoLog 与业务 SQL 一起提交,最后上报分支事务的状态(成功 or 失败),并将ConnectionContext上下文重置。

注:@GlobalLock注解的作用,用GlobalLock修饰的本地业务方法,虽然该方法并非某个全局事务下的分支事务,但是它对数据资源的操作也需要先查询全局锁,如果存在其他 Seata 全局事务正在修改,则该方法也需等待。所以,如果想要 Seata 全局事务执行期间,数据库不会被其他事务修改,则该方法需要强制添加GlobalLock注解,来将其纳入 Seata 分布式事务的管理范围。

综上所述,RM 模块通过对 JDBC 数据源进行代理,干预业务 SQL 执行过程,加入了很多流程,比如业务 SQL 解析、业务 SQL 执行前后的数据快照查询并组织成 UndoLog、全局锁检查、分支事务注册、UndoLog 写入并随本地事务一起 Commit、分支事务状态上报等。通过这种方式,seata 真正做到了对业务代码无侵入,只需要通过简单的配置,业务方就可以轻松享受 seata 所带来的功能。

Phase1 整体流程引用 seata 官方图总结如下:


Phase2 - 分支事务提交或回滚

阶段 2 完成的是全局事务的最终提交或回滚,当全局事务中所有分支事务全部完成并且都执行成功,这时 TM 会发起全局事务提交,TC 收到全全局事务提交消息后,会通知各分支事务进行提交;同理,当全局事务中所有分支事务全部完成并且某个分支事务失败了,TM 会通知 TC 协调全局事务回滚,进而 TC 通知各分支事务进行回滚。

在业务应用启动过程中,由于引入了seata 客户端,RmRpcClient会随应用一起启动,RmRpcClient采用 Netty 实现,可以接收 TC 消息和向 TC 发送消息,因此RmRpcClient是与 TC 收发消息的关键模块。

下面分成两部分来讲:分支事务提交、分去事务回滚。

1、分支事务提交

在接收到 TC 发起的全局提交消息后,RmRpcClient对通信协议的处理,会根据发送请求 Body 类型(分支提交请求或者分支回滚请求),调用不同的处理逻辑,RmMessageListener#onMessage

    @Override
    public void onMessage(long msgId, String serverAddress, Object msg, ClientMessageSender sender) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("onMessage:" + msg);
        }
        if (msg instanceof BranchCommitRequest) {
            handleBranchCommit(msgId, serverAddress, (BranchCommitRequest)msg, sender);
        } else if (msg instanceof BranchRollbackRequest) {
            handleBranchRollback(msgId, serverAddress, (BranchRollbackRequest)msg, sender);
        }
    }

    private void handleBranchRollback(long msgId, String serverAddress,
                                      BranchRollbackRequest branchRollbackRequest,
                                      ClientMessageSender sender) {
        BranchRollbackResponse resultMessage = null;
        resultMessage = (BranchRollbackResponse)handler.onRequest(branchRollbackRequest, null);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("branch rollback result:" + resultMessage);
        }
        try {
            sender.sendResponse(msgId, serverAddress, resultMessage);
        } catch (Throwable throwable) {
            LOGGER.error("", "send response error", throwable);
        }
    }

    private void handleBranchCommit(long msgId, String serverAddress,
                                    BranchCommitRequest branchCommitRequest,
                                    ClientMessageSender sender) {

        BranchCommitResponse resultMessage = null;
        try {
            resultMessage = (BranchCommitResponse)handler.onRequest(branchCommitRequest, null);
            sender.sendResponse(msgId, serverAddress, resultMessage);
        } catch (Exception e) {
            LOGGER.error(FrameworkErrorCode.NetOnMessage.getErrCode(), e.getMessage(), e);
            if (resultMessage == null) {
                resultMessage = new BranchCommitResponse();
            }
            resultMessage.setResultCode(ResultCode.Failed);
            resultMessage.setMsg(e.getMessage());
            sender.sendResponse(msgId, serverAddress, resultMessage);
        }
    }

由于 TC 发起的是全局提交消息,于是它发送的消息 Body 类型为分支事务提交请求BranchCommitRequest,于是会走分支事务提交的逻辑,后面会交由RMHandlerAT来完成对分支事务的提交,分支事务提交从RMHandlerAT.doBranchCommit()开始,但最后由AsyncWorker异步 Worker 完成,直接看AsyncWorker中的代码实现:

public class AsyncWorker implements ResourceManagerInbound {

    @Override
    public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                     String applicationData) throws TransactionException {
        if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
            LOGGER.warn("Async commit buffer is FULL. Rejected branch [" + branchId + "/" + xid
                + "] will be handled by housekeeping later.");
        }
        return BranchStatus.PhaseTwo_Committed;
    }

    /**
     * Init.
     */
    public synchronized void init() {
        LOGGER.info("Async Commit Buffer Limit: " + ASYNC_COMMIT_BUFFER_LIMIT);
        timerExecutor = new ScheduledThreadPoolExecutor(1,
            new NamedThreadFactory("AsyncWorker", 1, true));
        timerExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {

                    doBranchCommits();

                } catch (Throwable e) {
                    LOGGER.info("Failed at async committing ... " + e.getMessage());

                }
            }
        }, 10, 1000 * 1, TimeUnit.MILLISECONDS);
    }
    ...
}

AsyncWorker#init初始化方法会在程序启动时执行,目的是开启定时任务,执行doBranchCommits()方法。我们先看branchCommit方法,它将分支事务组织成Phase2Context对象,并且放入阻塞队列ASYNC_COMMIT_BUFFER中。而doBranchCommits()方法正是从阻塞队列ASYNC_COMMIT_BUFFER中取出Phase2Context对象,从而删除 UndoLog 日志,我们看代码:

private void doBranchCommits() {
        if (ASYNC_COMMIT_BUFFER.size() == 0) {
            return;
        }

        Map<String, List<Phase2Context>> mappedContexts = new HashMap<>(DEFAULT_RESOURCE_SIZE);
        while (!ASYNC_COMMIT_BUFFER.isEmpty()) {
            Phase2Context commitContext = ASYNC_COMMIT_BUFFER.poll();
            List<Phase2Context> contextsGroupedByResourceId = mappedContexts.get(commitContext.resourceId);
            if (contextsGroupedByResourceId == null) {
                contextsGroupedByResourceId = new ArrayList<>();
                mappedContexts.put(commitContext.resourceId, contextsGroupedByResourceId);
            }
            contextsGroupedByResourceId.add(commitContext);

        }

        for (Map.Entry<String, List<Phase2Context>> entry : mappedContexts.entrySet()) {
            Connection conn = null;
            try {
                try {
                    DataSourceManager resourceManager = (DataSourceManager)DefaultResourceManager.get()
                        .getResourceManager(BranchType.AT);
                    DataSourceProxy dataSourceProxy = resourceManager.get(entry.getKey());
                    if (dataSourceProxy == null) {
                        throw new ShouldNeverHappenException("Failed to find resource on " + entry.getKey());
                    }
                    conn = dataSourceProxy.getPlainConnection();
                } catch (SQLException sqle) {
                    LOGGER.warn("Failed to get connection for async committing on " + entry.getKey(), sqle);
                    continue;
                }
                List<Phase2Context> contextsGroupedByResourceId = entry.getValue();
                Set<String> xids = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
                Set<Long> branchIds = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
                for (Phase2Context commitContext : contextsGroupedByResourceId) {
                    xids.add(commitContext.xid);
                    branchIds.add(commitContext.branchId);
                    int maxSize = xids.size() > branchIds.size() ? xids.size() : branchIds.size();
                    if (maxSize == UNDOLOG_DELETE_LIMIT_SIZE) {
                        try {
                            UndoLogManager.batchDeleteUndoLog(xids, branchIds, conn);
                        } catch (Exception ex) {
                            LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
                        }
                        xids.clear();
                        branchIds.clear();
                    }
                }

                if (CollectionUtils.isEmpty(xids) || CollectionUtils.isEmpty(branchIds)) {
                    return;
                }

                try {
                    UndoLogManager.batchDeleteUndoLog(xids, branchIds, conn);
                } catch (Exception ex) {
                    LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
                }

            } finally {
                if (conn != null) {
                    try {
                        conn.close();
                    } catch (SQLException closeEx) {
                        LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);
                    }
                }
            }
        }
    }

该方法主要根据Phase2Context对象中的 Xid 和 BranchId 找到并删除对应的 Undolog 日志。

同样,对于分支事务提交也引用 seata 官方一张图作为结尾:


2. 分支事务回滚

同样,当 TC 接收到全局事务回滚的指令时,会向每个 RM 发送分支事务回滚的请求,即BranchRollbackRequest。随后调用RMHandlerAT.doBranchRollback方法,然后到了dataSourceManager.branchRollback,最后完成分支事务回滚逻辑的是UndoLogManager.undo方法。

public abstract class AbstractRMHandler extends AbstractExceptionHandler{
  
    /**
     * Do branch rollback.
     *
     * @param request  the request
     * @param response the response
     * @throws TransactionException the transaction exception
     */
    protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)
        throws TransactionException {
        String xid = request.getXid();
        long branchId = request.getBranchId();
        String resourceId = request.getResourceId();
        String applicationData = request.getApplicationData();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId);
        }
        BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,
            applicationData);
        response.setXid(xid);
        response.setBranchId(branchId);
        response.setBranchStatus(status);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch Rollbacked result: " + status);
        }
    }
}

public class DataSourceManager extends AbstractResourceManager implements Initialize {

    @Override
    public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
        DataSourceProxy dataSourceProxy = get(resourceId);
        if (dataSourceProxy == null) {
            throw new ShouldNeverHappenException();
        }
        try {
            UndoLogManager.undo(dataSourceProxy, xid, branchId);
        } catch (TransactionException te) {
            if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
                return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
            } else {
                return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
            }
        }
        return BranchStatus.PhaseTwo_Rollbacked;

    }
}

UndoLogManager.undo方法源码如下:

public static void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
        assertDbSupport(dataSourceProxy.getDbType());

        Connection conn = null;
        ResultSet rs = null;
        PreparedStatement selectPST = null;

        for (; ; ) {
            try {
                conn = dataSourceProxy.getPlainConnection();

                // The entire undo process should run in a local transaction.
                conn.setAutoCommit(false);

                // Find UNDO LOG
                selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
                selectPST.setLong(1, branchId);
                selectPST.setString(2, xid);
                rs = selectPST.executeQuery();

                boolean exists = false;
                while (rs.next()) {
                    exists = true;

                    // It is possible that the server repeatedly sends a rollback request to roll back
                    // the same branch transaction to multiple processes,
                    // ensuring that only the undo_log in the normal state is processed.
                    int state = rs.getInt("log_status");
                    if (!canUndo(state)) {
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("xid {} branch {}, ignore {} undo_log",
                                    xid, branchId, state);
                        }
                        return;
                    }

                    Blob b = rs.getBlob("rollback_info");
                    byte[] rollbackInfo = BlobUtils.blob2Bytes(b);
                    BranchUndoLog branchUndoLog = UndoLogParserFactory.getInstance().decode(rollbackInfo);

                    for (SQLUndoLog sqlUndoLog : branchUndoLog.getSqlUndoLogs()) {
                        TableMeta tableMeta = TableMetaCache.getTableMeta(dataSourceProxy, sqlUndoLog.getTableName());
                        sqlUndoLog.setTableMeta(tableMeta);
                        AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                            dataSourceProxy.getDbType(),
                            sqlUndoLog);
                        undoExecutor.executeOn(conn);
                    }
                }

                // If undo_log exists, it means that the branch transaction has completed the first phase,
                // we can directly roll back and clean the undo_log
                // Otherwise, it indicates that there is an exception in the branch transaction,
                // causing undo_log not to be written to the database.
                // For example, the business processing timeout, the global transaction is the initiator rolls back.
                // To ensure data consistency, we can insert an undo_log with GlobalFinished state
                // to prevent the local transaction of the first phase of other programs from being correctly submitted.
                // See https://github.com/seata/seata/issues/489

                if (exists) {
                    deleteUndoLog(xid, branchId, conn);
                    conn.commit();
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("xid {} branch {}, undo_log deleted with {}",
                                xid, branchId, State.GlobalFinished.name());
                    }
                } else {
                    insertUndoLogWithGlobalFinished(xid, branchId, conn);
                    conn.commit();
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("xid {} branch {}, undo_log added with {}",
                                xid, branchId, State.GlobalFinished.name());
                    }
                }

                return;
            } catch (SQLIntegrityConstraintViolationException e) {
                // Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback",
                            xid, branchId);
                }
            } catch (Throwable e) {
                if (conn != null) {
                    try {
                        conn.rollback();
                    } catch (SQLException rollbackEx) {
                        LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);
                    }
                }
                throw new TransactionException(BranchRollbackFailed_Retriable, String.format("%s/%s", branchId, xid),
                    e);

            } finally {
                try {
                    if (rs != null) {
                        rs.close();
                    }
                    if (selectPST != null) {
                        selectPST.close();
                    }
                    if (conn != null) {
                        conn.close();
                    }
                } catch (SQLException closeEx) {
                    LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx);
                }
            }
        }
    }

基本逻辑为:

  1. 获取Connection对象,并设置成非 Auto-Commit 模式。
  2. 查询 UndoLog。
  3. 对 UndoLog 进行解码处理。
  4. 根据 UndoLog 反向生成业务 SQL 并执行。
  5. 删除 UndoLog,并提交本地事务。

从上面源码可以看出,整个回滚到全局事务之前状态的代码逻辑集中在如下代码中:

AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                            dataSourceProxy.getDbType(),
                            sqlUndoLog);

首先通过UndoExecutorFactory获取到对应的UndoExecutor,然后再执行UndoExecutorexecuteOn方法完成回滚操作。目前三种类型的UndoExecutor结构如下:


undoExecutor.executeOn源码如下:

public void executeOn(Connection conn) throws SQLException {

        if (IS_UNDO_DATA_VALIDATION_ENABLE && !dataValidationAndGoOn(conn)) {
            return;
        }
        
        try {
            String undoSQL = buildUndoSQL();

            PreparedStatement undoPST = conn.prepareStatement(undoSQL);

            TableRecords undoRows = getUndoRows();

            for (Row undoRow : undoRows.getRows()) {
                ArrayList<Field> undoValues = new ArrayList<>();
                Field pkValue = null;
                for (Field field : undoRow.getFields()) {
                    if (field.getKeyType() == KeyType.PrimaryKey) {
                        pkValue = field;
                    } else {
                        undoValues.add(field);
                    }
                }

                undoPrepare(undoPST, undoValues, pkValue);

                undoPST.executeUpdate();
            }

        } catch (Exception ex) {
            if (ex instanceof SQLException) {
                throw (SQLException) ex;
            } else {
                throw new SQLException(ex);
            }

        }

    }

该方法会根据 UndoLog,反向生成业务 SQL,设置参数并执行。

至此,整个分支事务回滚就结束了。

引入seata 官方对分支事务回滚原理介绍图作为结尾:


思考:在 Phase2 阶段,全局事务提交或者回滚的过程中,如果出现了超时或者异常情况,导致提交或者回滚失败时,seata 是如何保证分布式事务运行结果的正确性的?

其实,从上面源码中可以发现,当全局事务提交或者回滚操作处理完成之后(异常会封装异常信息),会把处理结果发送给 TC(服务端),sender.sendResponse(msgId, serverAddress, resultMessage)。服务端那边会有超时检测和重试机制,来保证分布式事务运行结果的正确性。

综合上述,seata 在 Phase2 通过 UndoLog 自动完成分支事务提交与回滚,在这个过程中不需要业务方做任何处理,业务方无感知,因此在该阶段对业务代码也是无侵入的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352

推荐阅读更多精彩内容

  • 老师在课堂上对“五步营销法”进行了详细的讲解与剖析,演讲者在梳理自己的产品或者在向客户介绍自己自家企业产品的时候,...
    Diane1006阅读 932评论 0 0
  • 今天是2018年10月5日是【晓晖有话说】陪伴你的第六百四十一天。 【我们的人类世界】:我们自以为活在天堂,在另一...
    晖晖晓阅读 400评论 0 0
  • 妻怀小女的时候,我特别亢奋,就像攻城的桃花,控岸的柳树,心如春天似柳絮飞扬,更如旌旗鲜明的胜利之胜、威武之师、王者...
    Qiuqiyao阅读 390评论 0 0
  • 简单实用缓存 iOS网络缓存扫盲篇--使用两行代码就能完成80%的缓存需求
    相濡wei阅读 944评论 0 0
  • 今日,闲来无聊看了巨幕厅的寻梦环游记,5000人的场座无虚席,以至于我跟肖爷只能选在了靠边的位置。但是说起这部片子...
    汝阳阅读 200评论 1 0