Fescar example解析 - GlobalTransaction

开篇

 这篇文章是接着Fescar example解析 - TM流程的下一步分析,主要是对TM的处理逻辑的进一步分析,理清楚TM(Transaction Manager )的处理步骤以及代码调用链。

 这篇文章的结论是TM执行事务操作包括begin/commit/rollback都是通过DefaultTransactionManager类来实现,


背景介绍

事务资料摘自Fescar概览
与XA 的模型类似,我们定义 3 个组件来协议分布式事务的处理过程。

FESCAR Model
  • Transaction Coordinator (TC): 事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚。
  • Transaction Manager (TM): 控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议。
  • Resource Manager (RM): 控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。

一个典型的分布式事务过程:

  1. TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID。
  2. XID 在微服务调用链路的上下文中传播。
  3. RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务的管辖。
  4. TM 向 TC 发起针对 XID 的全局提交或回滚决议。
  5. TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。
FESCAR Architecture


执行过程

Fescar TM执行过程

说明,整个执行流程如下:

  • 1.TransactionalTemplate通过GlobalTransactionContext.getCurrentOrCreate()返回GlobalTransaction对象。

  • 2.GlobalTransactionContext的createNew()方法创建DefaultGlobalTransaction对象。

  • 3.DefaultGlobalTransaction的构造方法当中创建DefaultTransactionManager对象。

  • 4.TransactionalTemplate通过DefaultGlobalTransaction执行begin/commit/rollback等操作。

  • 5.DefaultGlobalTransaction内部通过DefaultTransactionManager执行begin/commit/rollback等操作。


源码解析

public class TransactionalTemplate {

    public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {

        // 1. 创建一个GlobalTransaction对象
        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

        // 2. 通过GlobalTransaction开始执行事务
        try {
            tx.begin(business.timeout(), business.name());

        } catch (TransactionException txe) {
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.BeginFailure);

        }
}

说明:

  • 创建tx对象,GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate()。

  • 执行全局事务,tx.beigin()及其他省略的一部分代码。


public class GlobalTransactionContext {

    private static final ThreadLocal<GlobalTransaction> THREAD_TRANSACTION_CONTEXT = new ThreadLocal<>();

    private GlobalTransactionContext() {
    }

    // 创建GlobalTransaction对象
    private static GlobalTransaction createNew() {
        GlobalTransaction tx = new DefaultGlobalTransaction();
        THREAD_TRANSACTION_CONTEXT.set(tx);
        return THREAD_TRANSACTION_CONTEXT.get();
    }

    public static GlobalTransaction getCurrent() {
        GlobalTransaction tx = THREAD_TRANSACTION_CONTEXT.get();
        if (tx != null) {
            return tx;
        }
        String xid = RootContext.getXID();
        if (xid == null) {
            return null;
        }
        tx = new DefaultGlobalTransaction(xid);
        THREAD_TRANSACTION_CONTEXT.set(tx);
        return THREAD_TRANSACTION_CONTEXT.get();
    }

    public static GlobalTransaction getCurrentOrCreate() {
        GlobalTransaction tx = getCurrent();
        if (tx == null) {
            return createNew();
        }
        return tx;
    }
}

说明:

  • createNew()方法创建GlobalTransaction tx对象,类型是DefaultGlobalTransaction。

  • 保存tx到线程当中实现线程隔离,THREAD_TRANSACTION_CONTEXT.set(tx)。

  • GlobalTransaction对象负责执行事务的begin()、commit()、rollback()等方法。


public class DefaultGlobalTransaction implements GlobalTransaction {

    private static final int DEFAULT_GLOBAL_TX_TIMEOUT = 60000;
    private static final String DEFAULT_GLOBAL_TX_NAME = "default";

    private TransactionManager transactionManager;

    private String xid;

    private GlobalStatus status = GlobalStatus.UnKnown;

    private GlobalTransactionRole role = GlobalTransactionRole.Launcher;

    DefaultGlobalTransaction(String xid) {
        this.transactionManager = DefaultTransactionManager.get();
        this.xid = xid;
        if (xid != null) {
            status = GlobalStatus.Begin;
            role = GlobalTransactionRole.Participant;
        }
    }

    @Override
    public void begin(int timeout, String name) throws TransactionException {
        if (xid == null && role == GlobalTransactionRole.Launcher) {
            xid = transactionManager.begin(null, null, name, timeout);
            status = GlobalStatus.Begin;
            RootContext.bind(xid);
        } else {
            if (xid == null) {
                throw new ShouldNeverHappenException(role + " is NOT in a global transaction context.");
            }
            LOGGER.info(role + " is already in global transaction " + xid);
        }

    }

    @Override
    public void commit() throws TransactionException {
        check();
        RootContext.unbind();
        if (role == GlobalTransactionRole.Participant) {
            // Participant has no responsibility of committing
            return;
        }
        status = transactionManager.commit(xid);

    }

    @Override
    public void rollback() throws TransactionException {
        check();
        RootContext.unbind();
        if (role == GlobalTransactionRole.Participant) {
            // Participant has no responsibility of committing
            return;
        }
        status = transactionManager.rollback(xid);

    }

    @Override
    public GlobalStatus getStatus() throws TransactionException {
        check();
        status = transactionManager.getStatus(xid);
        return status;
    }
}

说明:

  • DefaultGlobalTransaction构造函数创建transactionManager 对象,this.transactionManager = DefaultTransactionManager.get();

  • DefaultGlobalTransaction的begin/commit/rollback通过TransactionManager的begin/commit/rollback实现。


public class DefaultTransactionManager implements TransactionManager {

    private static class SingletonHolder {
        private static final TransactionManager INSTANCE = new DefaultTransactionManager();
    }

    public static TransactionManager get() {
        return SingletonHolder.INSTANCE;
    }

    private DefaultTransactionManager() {

    }

    @Override
    public String begin(String applicationId, String transactionServiceGroup, 
                        String name, int timeout) throws TransactionException {
        GlobalBeginRequest request = new GlobalBeginRequest();
        request.setTransactionName(name);
        request.setTimeout(timeout);
        GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
        return response.getXid();
    }

    @Override
    public GlobalStatus commit(String xid) throws TransactionException {
        long txId = XID.getTransactionId(xid);
        GlobalCommitRequest globalCommit = new GlobalCommitRequest();
        globalCommit.setTransactionId(txId);
        GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
        return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus rollback(String xid) throws TransactionException {
        long txId = XID.getTransactionId(xid);
        GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
        globalRollback.setTransactionId(txId);
        GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);
        return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus getStatus(String xid) throws TransactionException {
        long txId = XID.getTransactionId(xid);
        GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();
        queryGlobalStatus.setTransactionId(txId);
        GlobalStatusResponse response = (GlobalStatusResponse) syncCall(queryGlobalStatus);
        return response.getGlobalStatus();
    }

    private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
        try {
            return (AbstractTransactionResponse) TmRpcClient.getInstance().sendMsgWithResponse(request);
        } catch (TimeoutException toe) {
            throw new TransactionException(TransactionExceptionCode.IO, toe);
        }
    }
}

说明:

  • DefaultTransactionManager 是单例实现全局唯一。

  • DefaultTransactionManager 是TM实现begin/commit/rollback的核心逻辑。

  • DefaultTransactionManager 的begin/commit/rollback通过和TC通信实现。

  • DefaultTransactionManager 的syncCall实现和TC通信。


Fescar源码分析连载

Fescar 源码解析系列

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,546评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,224评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,911评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,737评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,753评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,598评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,338评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,249评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,696评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,888评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,013评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,731评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,348评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,929评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,048评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,203评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,960评论 2 355