Fescar源码阅读-全自动的分布式事务AT

全局事务如何运作,只针对AT模式。(源码持续更新,本文仅供参考)



前文大致了解了Fescar系统总体架构、消息定义和交互方式,现在来看看Fescar如何通过这些消息的交互,最终转换为对分布式事务的管控。


再看一次这张图:


TM+RM+TC

首先Fescar中,分布式事务的生命周期是交给TC来协调管理的,对于一个全局事务,TC需要管理全局事务以及全局事务下包含的所有branch分支事务(已注册的)。


全局事务生命周期(begin, commit, rollback)可以用以下代码来体现:
2.1部分,可开启一个或者多个本地事务,同时本地事务加入全局事务, 被TM和TC管理。

// 1. get or create a transaction
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
// 2. begin transaction
try {
    tx.begin(business.timeout(), business.name());
} catch (TransactionException txe) {
}

Object rs = null;
try {

    // 2.1 自己的业务逻辑
    rs = business.execute();
} catch (Throwable ex) {
    // 3. any business exception, rollback.
    try {
        tx.rollback();
    }
}

// 4. everything is fine, commit.
try {
    tx.commit();

} catch (TransactionException txe) {

}
return rs;

在了解Fescar如何管理事务之前,需要想看看关键的两个类GlobalSessionBranchSession,顾名思义,这两个类分别定义、维护了全局事务和branch分支事务的信息和状态。

public class GlobalSession implements SessionLifecycle, SessionStorable {
    // 全局事务ID
    private long transactionId;

    // 全局事务状态
    private GlobalStatus status;

    // 应用ID 标识发起全局事务的服务
    private String applicationId;

    //全局事务分组  默认default
    private String transactionServiceGroup;
    
    //全局事务名称
    private String transactionName;

    private int timeout;

    private long beginTime;

    private boolean active;

    //branch事务Session
    private ArrayList<BranchSession> branchSessions = new ArrayList<>();
    // 省略...
}
public class BranchSession implements Lockable, Comparable<BranchSession>, SessionStorable {
    // 全局事务ID
    private long transactionId;
    
    // branch事务ID
    private long branchId;
    
    // 忽略,暂未使用
    private String resourceGroupId;

    // 资源ID  
    private String resourceId;
    
    // 锁 key
    private String lockKey;

    // AT, MT
    private BranchType branchType;

    private BranchStatus status = BranchStatus.Unknown;

    private String applicationId;

    private String txServiceGroup;
    
    // 标识具体client  applicationID + client_ip + client_port
    private String clientId;
    
    // 忽略,暂未使用
    private String applicationData;

    private ConcurrentHashMap<Map<String, Long>, Set<String>> lockHolder = new ConcurrentHashMap<Map<String, Long>, Set<String>>();
    // 省略...
}

开启全局事务(TM连接和注册)

FescarFlow-begin.png
  • TM发送GlobalBeginRequest到TC,请求开启全局事务
  • TC处理请求,生成GlobalSession,生成TransacntionId,xid,并返回给TM,全局事务开启成功

提交本地事务(RM连接和注册)

FescarFlow-RM-register.png

以DataSourceRM为例:

  • RM的操作从DataSourceProxxy和ConnectionProxy发起,代理了真正的DataSource和Connection
  • 本地数据库事务提交时,RM判断是否存在全局事务,如果是这注册branchTransaction(注册时将会携带lockkey,TC加锁,后续再看这些细节~)
  • TC生成BranchSession 并返回。
  • RM注册完成后其实并不会阻塞等待全局事务的提交(fescar最新实现有全局锁模式,但已经不属于当前流程,暂时先不管),而是先生成redoLog(划重点,redolog是Fescar可以放心的提交本地事务的关键,我们下一章在看),然后直接提交本地事务,最后向TC报告。

commit部分代码如下:

public void commit() throws SQLException {
        try {
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e);
        }

        try {
            if (context.hasUndoLog()) {
                UndoLogManager.flushUndoLogs(this);
            }
            targetConnection.commit();
        } catch (Throwable ex) {
            report(false);
            if (ex instanceof SQLException) {
                throw new SQLException(ex);
            }
        }
        report(true);
        context.reset();
    } 

提交/回滚全局事务

全局事务的提交、回滚都由TM控制,发起,TC协调。

  • 全局事务提交
    TM发起提交,TC负责校验各个branch session的状态,是否正常提交,如果失败可发起重试。
  • TC通知RM提交branch事务,此时DataSourceRM将会删除undolog。
    TC核心接口:com.alibaba.fescar.server.coordinator.Core
void doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException;

void doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException;

部分代码如下:

// 通知RM执行commit,DataSourceRM将会删除undolog
BranchStatus branchStatus = resourceManagerInbound.branchCommit(branchSession.getBranchType(), XID.generateXID(branchSession.getTransactionId()), branchSession.getBranchId(),
                    branchSession.getResourceId(), branchSession.getApplicationData());
witch (branchStatus) {
    case PhaseTwo_Committed:
        globalSession.removeBranch(branchSession);
        continue;
    case PhaseTwo_CommitFailed_Unretryable:
        if (globalSession.canBeCommittedAsync()) {
            LOGGER.error("By [{}], failed to commit branch {}", branchStatus, branchSession);
            continue;
        } else {
            SessionHelper.endCommitFailed(globalSession);
            LOGGER.error("Finally, failed to commit global[{}] since branch[{}] commit failed",
                globalSession.getTransactionId(), branchSession.getBranchId());
            return;
        }
    default:
        if (!retrying) {
                     //转入重试队列
                    queueToRetryCommit(globalSession);
            return;
        }
  • 全局事务回滚
    TM发起回滚,TC通知RM回滚。
BranchStatus branchStatus = resourceManagerInbound.branchRollback(branchSession.getBranchType(), XID.generateXID(branchSession.getTransactionId()), branchSession.getBranchId(),
                    branchSession.getResourceId(), branchSession.getApplicationData());

switch (branchStatus) {
    case PhaseTwo_Rollbacked:
        globalSession.removeBranch(branchSession);
        LOGGER.error("Successfully rolled back branch " + branchSession);
        continue;
    case PhaseTwo_RollbackFailed_Unretryable:
        SessionHelper.endRollbackFailed(globalSession);
        LOGGER.error("Failed to rollback global[" + globalSession.getTransactionId() + "] since branch["
            + branchSession.getBranchId() + "] rollback failed");
        return;
    default:
        LOGGER.info("Failed to rollback branch " + branchSession);
        if (!retrying) {
            queueToRetryRollback(globalSession);
        }

看看DataSrouceRM如何rollback

DataSourceProxy dataSourceProxy = get(resourceId);

try {
    UndoLogManager.undo(dataSourceProxy, xid, branchId);
} catch (TransactionException te) {
    if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
        return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
    } else {
        return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
    }
}
return BranchStatus.PhaseTwo_Rollbacked;

很明显,对于已经提交的本地事务,DataSourceRM直接使用commit时生成的undolog进行数据回滚!
完整的分布式事务完成!
很明显,undolog就是Fescar可以打破二段提交的机制,允许本地事务在第一阶段就提交的基础,也是Fescar自信拥有高性能和高吞吐量的底气所在。

那么undolog道理是如何形成,里面内容是什么呢?Fescar如何利用undolog进行回滚呢?下一章继续

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容