TX-LCN 核心流程源码分析

TX-LCN核心源码解读

TX-LCN是基于Java编写的分布式事务解决方案框架,主要提供三种主流的解决方案

  • LCN模式,通过代理JDBC Connection来控制协调多组本地原子事务的提交与关闭
  • TCC模式,属于框架级别解决方案,对业务入侵性极大
  • TXC模式,核心方案为查询 + 分布式锁的分布式事务解决方案,由淘宝团队提出

核心组件

  • TC,作为分布式事务组件的客户端角色,主要作用在于治理本地事务
  • TM,作为分布式事务组件的服务端角色,主要作用在于协调事务组

核心概念

  • 事务组,group,描述整个分布式环境下运行的各个事务(以一个request所需要完成的事务)组合而成的一组事务。
  • 事务单元,unit,描述一个事务组内除开主事务之外的从事务,一个从事务表示一个事务单元。

核心流程

事务核心流程.png

核心源码

LCN模式作为实践方案,源码也以LCN 作为解读对象。LCN 模式是什么模式?

LCN模式是通过代理Connection的方式实现对本地事务的操作,然后在由TxManager统一协调控制事务。当本地事务提交回滚或者关闭连接时将会执行假操作,该代理的连接将由LCN连接池管理。

创建事务组

/**
 * Client创建事务组操作集合
 *
 * @param groupId         groupId
 * @param unitId          unitId
 * @param transactionInfo transactionInfo
 * @param transactionType transactionType
 * @throws TransactionException 创建group失败时抛出
 */
public void createGroup(String groupId, String unitId, TransactionInfo transactionInfo, String transactionType)
    throws TransactionException {
    //创建事务组
    try {
        // 日志
        txLogger.transactionInfo(groupId, unitId,
                                 "create group > {} > groupId: {xid}, unitId: {uid}", transactionType);
        // 创建事务组消息
        reliableMessenger.createGroup(groupId);
        // 缓存发起方切面信息
        aspectLogger.trace(groupId, unitId, transactionInfo);
    } catch (RpcException e) {
        // 通讯异常
        dtxExceptionHandler.handleCreateGroupMessageException(groupId, e);
    } catch (LcnBusinessException e) {
        // 创建事务组业务失败
        dtxExceptionHandler.handleCreateGroupBusinessException(groupId, e.getCause());
    }
    txLogger.transactionInfo(groupId, unitId, "create group over");
}

TC 发起创建事务组仅仅是像服务端发起请求,参数为groupId ,核心的代码在于TM 接到请求后的处理。


public class CreateGroupExecuteService implements RpcExecuteService {

    // ...
    @Override
    public Serializable execute(TransactionCmd transactionCmd) throws TxManagerException {
        // ...
        transactionManager.begin(transactionCmd.getGroupId());
    }
}

public class DefaultDTXContextRegistry implements DTXContextRegistry {

    private final FastStorage fastStorage;

    @Override
    public DTXContext create(String groupId) throws TransactionException {
        // ..
        fastStorage.initGroup(groupId);
    }
}


public class RedisStorage implements FastStorage {

    // ...
    @Override
    public void initGroup(String groupId) {
        // 将groupId存入Redis
        redisTemplate.opsForHash().put(REDIS_GROUP_PREFIX + groupId, "root", "");
        redisTemplate.expire(REDIS_GROUP_PREFIX + groupId, managerConfig.getDtxTime() + 10000, TimeUnit.MILLISECONDS);
    }
}

加入事务组


/**
 * Client加入事务组操作集合
 *
 * @param groupId         groupId
 * @param unitId          unitId
 * @param transactionType transactionType
 * @param transactionInfo transactionInfo
 * @throws TransactionException 加入事务组失败时抛出
 */
public void joinGroup(String groupId, String unitId, String transactionType, TransactionInfo transactionInfo) throws TransactionException {
    
    // 询问TM加入事务组
    // 该groupId由远程RPC通过header方式携带到从事务
    reliableMessenger.joinGroup(groupId, unitId, transactionType, DTXLocalContext.transactionState());

    // 异步检测
    dtxChecking.startDelayCheckingAsync(groupId, unitId, transactionType);
    
    // ...
}

// 
@Override
public void startDelayCheckingAsync(String groupId, String unitId, String transactionType) {
    txLogger.taskInfo(groupId, unitId, "start delay checking task");
    // 异步阻塞的方式
    ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(() -> {
        try {
            TxContext txContext = globalContext.txContext(groupId);
            if (Objects.nonNull(txContext)) {
                synchronized (txContext.getLock()) {
                    txLogger.info(groupId, unitId, Transactions.TAG_TASK,
                            "checking waiting for business code finish.");
                    txContext.getLock().wait();
                }
            }
            int state = reliableMessenger.askTransactionState(groupId, unitId);// 询问事务组是否成功
            txLogger.taskInfo(groupId, unitId, "ask transaction state {}", state);
            if (state == -1) {
                txLogger.error(this.getClass().getSimpleName(), "delay clean transaction error.");
                onAskTransactionStateException(groupId, unitId, transactionType);
            } else {
                transactionCleanTemplate.clean(groupId, unitId, transactionType, state);// 事务清理,即commit or rollback
                aspectLogger.clearLog(groupId, unitId);
            }

        } catch (RpcException e) {
            onAskTransactionStateException(groupId, unitId, transactionType);
        } catch (TransactionClearException | InterruptedException e) {
            txLogger.error(this.getClass().getSimpleName(), "{} clean transaction error.", transactionType);
        }
    }, clientConfig.getDtxTime(), TimeUnit.MILLISECONDS);// 时间为最大事务时间,该时间由TM配置,在TC初始化时从TM放取到
    delayTasks.put(groupId + unitId, scheduledFuture);
}

//  询问事务是否成功
private void onAskTransactionStateException(String groupId, String unitId, String transactionType) {
    try {
        // 通知TxManager事务补偿
        txMangerReporter.reportTransactionState(groupId, unitId, TxExceptionParams.ASK_ERROR, 0);
        log.warn("{} > has compensation info!", transactionType);

        // 事务回滚, 保留适当的补偿信息
        transactionCleanTemplate.compensationClean(groupId, unitId, transactionType, 0);
    } catch (TransactionClearException e) {
        log.error("{} > clean transaction error.", transactionType);
    }
}

TC 核心工作是申请加入事务组,并启动异步任务在事务最大时间后访问TM 事务组的全局事务状态来进行事务协调。

创建异步任务对象并将其缓存在本地内存的delayTasks, 如果在事务最大时间内已经完成并调用则将该任务取消。

@Override
public void stopDelayChecking(String groupId, String unitId) {
    ScheduledFuture scheduledFuture = delayTasks.get(groupId + unitId);
    if (Objects.nonNull(scheduledFuture)) {
        txLogger.taskInfo(groupId, unitId, "cancel {}:{} checking.", groupId, unitId);
        scheduledFuture.cancel(true); // 取消任务
    }
}

而在TM 端则进行如下处理


// 加入事务组
// RedisStorage
public void saveTransactionUnitToGroup(String groupId, TransactionUnit transactionUnit) throws FastStorageException {
    if (Optional.ofNullable(redisTemplate.hasKey(REDIS_GROUP_PREFIX + groupId)).orElse(false)) {
        redisTemplate.opsForHash().put(REDIS_GROUP_PREFIX + groupId, transactionUnit.getUnitId(), transactionUnit);
        return;
    }
    throw new FastStorageException("attempts to the non-existent transaction group " + groupId, FastStorageException.EX_CODE_NON_GROUP);
}

// TC 询问该事务组全局事务状态
public int transactionState(String groupId) {

    
    int state = exceptionService.transactionState(groupId); // 查询数据库t_tx_exception得到该事务的状态
    
    //存在数据时返回数据状态
    if (state != -1) {
        return state;
    }
    
    // 查询redis该事务组的全局状态
    return dtxContextRegistry.transactionState(groupId);
    
    // 为什么会先查数据库,再查redis ? 后文会有说明,主要是考虑事务补偿时的问题
}

// 返回的最终状态
public class AskTransactionStateExecuteService implements RpcExecuteService {

    @Override
    public Serializable execute(TransactionCmd transactionCmd) {
        int state = transactionManager.transactionState(transactionCmd.getGroupId());
        return state == -1 ? 0 : state;
    }
}

通知事务组

主事务的业务代码执行完毕,最终必须调用通知事务组进行全局事务协调。通知完成后进行事务清理


/**
 * Client通知事务组操作集合
 *
 * @param groupId         groupId
 * @param unitId          unitId
 * @param transactionType transactionType
 * @param state           transactionState
 */
public void notifyGroup(String groupId, String unitId, String transactionType, int state) {
    // ...
    // 事务通知
    reliableMessenger.notifyGroup(groupId, state);
    // 事务清理
    transactionCleanTemplate.clean(groupId, unitId, transactionType, state);
    // 通知异常(RPC调用异常)
    dtxExceptionHandler.handleNotifyGroupMessageException(Arrays.asList(groupId, state, unitId, transactionType), e);
    // ...
    
}

// 当TC调用TM抛出异常时,会正常的按照当前事务的状态进行提交,并将结果上报到TM

public void handleNotifyGroupMessageException(Object params, Throwable ex) {
    
    // 参数中取出事务的状态
    // ....

    // 按状态正常结束事务(切面补偿记录将保留)
    // TxManager 存在请求异常或者响应异常两种情况。当请求异常时这里的业务需要补偿,当响应异常的时候需要做状态的事务清理。
    // 请求异常时
    //     参与放会根据上报补偿记录做事务的提交。
    // 响应异常时
    //     参与反会正常提交事务,本地业务提示事务。

    // 该两种情况下补偿信息均可以忽略,可直接把本地补偿记录数据删除。


    String unitId = (String) paramList.get(2);
    String transactionType = (String) paramList.get(3);
    try {
        transactionCleanTemplate.compensationClean(groupId, unitId, transactionType, state);// 本地事务提交
    } catch (TransactionClearException e) {
        log.error("{} > compensationClean transaction error.", transactionType);
    }

    // 上报Manager,上报直到成功.
    txMangerReporter.reportTransactionState(groupId, null, TxExceptionParams.NOTIFY_GROUP_ERROR, state);
    // 提交的事务记录到t_tx_exception表中,所以会看到前文TC询问事务状态时,会优先查询数据库,而不是直接查redis
}


通知事务组的概念,应该理解为,主事务告知TM 进行全部的事务协调,即TM 仅会通知各个从事务进行commit or rollback,而不会通知主事务进行commit or rollback 。因为在前文看到创建事务组时,TM 并没有将主事务unitId 记录下来。而从事务加入事务组时,除了记录全局事务组Id,还包括事务单元unitId .


public Serializable execute(TransactionCmd transactionCmd) throws TxManagerException {
    try {
        // 从redis取事务状态
        int transactionState = transactionManager.transactionStateFromFastStorage(transactionCmd.getGroupId());
        boolean hasThrow = false;
        if (transactionState == 0) {
            commitState = 0;
            hasThrow = true;
        }
        // 事务状态为1进行全局事务提交
        if (commitState == 1) {
            transactionManager.commit(dtxContext);
        } else if (commitState == 0) {
            transactionManager.rollback(dtxContext);
        }
        // ...
    } catch (TransactionException e) {
        throw new TxManagerException(e);
    } finally {
        transactionManager.close(transactionCmd.getGroupId());
        // 系统日志
        txLogger.transactionInfo(transactionCmd.getGroupId(), "", "notify group over");
    }
    return null;
}

// 事务通知
private void notifyTransaction(DTXContext dtxContext, int transactionState) throws TransactionException {
    for (TransactionUnit transUnit : dtxContext.transactionUnits()) {
        NotifyUnitParams notifyUnitParams = new NotifyUnitParams();
        notifyUnitParams.setGroupId(dtxContext.getGroupId());
        notifyUnitParams.setUnitId(transUnit.getUnitId());
        notifyUnitParams.setUnitType(transUnit.getUnitType());
        notifyUnitParams.setState(transactionState);
        txLogger.info(dtxContext.getGroupId(),
                notifyUnitParams.getUnitId(), Transactions.TAG_TRANSACTION, "notify %s's unit: %s",
                transUnit.getModId(), transUnit.getUnitId());
        try {
            // 这里在5.0.1会出现信道问题,什么是信道问题?比如此时有两台push注册到TM上,而某一刻的全局事务所在的本地事务只在其中一台,而通知的时候如果modId一致则会取到第一个
            // 如下get(0) . 解决的办法是生成modId时去的是Mac地址+端口+服务名称,保证了不同实例的全局唯一
            List<String> modChannelKeys = rpcClient.remoteKeys(transUnit.getModId());
            if (modChannelKeys.isEmpty()) {
                // record exception
                throw new RpcException("offline mod.");
            }
            MessageDto respMsg =
                    rpcClient.request(modChannelKeys.get(0), MessageCreator.notifyUnit(notifyUnitParams));
            if (!MessageUtils.statusOk(respMsg)) {
                // 提交/回滚失败的消息处理
                List<Object> params = Arrays.asList(notifyUnitParams, transUnit.getModId());
                rpcExceptionHandler.handleNotifyUnitBusinessException(params, respMsg.loadBean(Throwable.class));
            }
        } catch (RpcException e) {
            // 提交/回滚通讯失败
            List<Object> params = Arrays.asList(notifyUnitParams, transUnit.getModId());
            rpcExceptionHandler.handleNotifyUnitMessageException(params, e);
        } finally {
            txLogger.transactionInfo(dtxContext.getGroupId(), notifyUnitParams.getUnitId(), "notify unit over");
        }
    }
}

// 当通知出现异常时,将信息记录到t_tx_exception表
public class ManagerRpcExceptionHandler implements RpcExceptionHandler {

    @Override
    public void handleNotifyUnitMessageException(Object params, Throwable e) {
        // notify unit message error, write txEx
        List paramList = ((List) params);
        String modName = (String) paramList.get(1);

        NotifyUnitParams notifyUnitParams = (NotifyUnitParams) paramList.get(0);
        WriteTxExceptionDTO writeTxExceptionReq = new WriteTxExceptionDTO(notifyUnitParams.getGroupId(),
                notifyUnitParams.getUnitId(), modName, notifyUnitParams.getState());
        writeTxExceptionReq.setRegistrar((short) 0);
        compensationService.writeTxException(writeTxExceptionReq);// 记录到t_tx_exception
        // 记住客户端主动查询时,优先查数据库,再查redis的事务状态
    }
}

总结

TX-LCN作为分布式解决方案是比较优秀的方案,代码逻辑也比较简单,但是如果应用Crash,就可能出现数据不一致的情况,而且这种数据不一致的情况必须人肉修复。

比如主事务在进行NotifyGroup 时出现RpcException 主事务会根据当前事务的状态进行commit or rollback ,之后会上报TM 记录补偿信息,假如在记录补偿时失败了(应用在这个点Crash)了,那么主事务提交了,并且TM 并不能完整地协调好从事务 的全局事务状态。

为什么需要人肉修复呢?其实从源码上可以分析出,TX-LCN解决的场景时将本地的事务通过事务协调器进行协调,但是本质上并没有将事务分布式节点化,即本地事务的成功与失败无法在不同的节点进行处理

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容