spring事务

声明

  • spring对事务的处理是如何获取事务增强器的文章spring事务如何获取事务增强器
  • 本文章是对spring声明式事务进行分析,从事务增强器开始分析,所有标注@Transaction的方法都会被拦截。

实验篇 不同传播属性事务嵌套实验

原理篇 spring事务总流程分析时序图

spring事务总流程.jpg

TransactionInterceptor

  • 方法拦截器直接从invoke方法分析。
public class TransactionInterceptor extends TransactionAspectSupport implements 
             MethodInterceptor, Serializable {
        @Override
    public Object invoke(final MethodInvocation invocation) throws Throwable {
    
        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

        // Adapt to TransactionAspectSupport's invokeWithinTransaction...
        return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
            @Override
            public Object proceedWithInvocation() throws Throwable {
                return invocation.proceed();
            }
        });
    }
}
  • 看invokeWithinTransaction对应的方法,这里调用TransactionAspectSupport的方法。
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
            throws Throwable {

        // 获取事务对应属性
        final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
        //获取bean中事务管理器
        final PlatformTransactionManager tm = determineTransactionManager(txAttr);
        //构造方法唯一标识别 类名+方法名
        final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
        //声明式事务
        if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
            // 创建事务信息--嵌套事务源码 回滚 提交都用到它
            TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
            Object retVal = null;
            try {
                // 执行增强方法
                retVal = invocation.proceedWithInvocation();
            }
            catch (Throwable ex) {
                // 异常回滚
                completeTransactionAfterThrowing(txInfo, ex);
                throw ex;
            }
            finally {
                //清除
                cleanupTransactionInfo(txInfo);
            }
            //提交
            commitTransactionAfterReturning(txInfo);
            return retVal;
        }

        //编程式事务就不贴出来
    }
  • 这里面的核心流程
  1. 获取事务的属性
  2. 加载配置中心配置的TransactionManager
  3. 不用事务处理方式使用不同逻辑, 编程式事务需要有书屋属性,声明式事务暴露对外接口供自身回调
  4. 收集事务信息
  5. 执行目标方法
  6. 异常处理
  7. 提交事务前的事务信息清理
  8. 提交事务。
    以下将对各个步骤进行分析。

获取事务的属性 对应核心流程第一点

  • TransactionAttributeSource接口的getTransactionAttribute方法是获取对应属性
  • AbstractFallbackTransactionAttributeSource(子类是AnnotationTransactionAttributeSource),实现了注解对应的getTransactionAttribute方法
@Override
    public TransactionAttribute getTransactionAttribute(Method method, Class<?> targetClass) {
        // First, see if we have a cached value.
        Object cacheKey = getCacheKey(method, targetClass);
        Object cached = this.attributeCache.get(cacheKey);
        if (cached != null) {
            //从缓存中拿
        }
        else {
            // 缓存中不存在 则调用
            TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass);
            // Put it in the cache.
            if (txAttr == null) {
                 //防止缓存击穿透
                this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
            }
            else {
                //放入缓存
                this.attributeCache.put(cacheKey, txAttr);
            }
            return txAttr;
        }
    }
  • computeTransactionAttribute方法
protected TransactionAttribute computeTransactionAttribute(Method method, Class<?> targetClass) {

        // 第一当前方法上面找
        TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
        if (txAttr != null) {
            return txAttr;
        }

        // 第二 类方法上面 找
        txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
        if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
            return txAttr;
        }
        //第三接口上找有没@Transaction注解
        if (specificMethod != method) {
            ....
        }

        return null;
    }

收集事务信息 对应核心流程第四点

  • 收集事务信息总时序图


    收集事务总时序图.jpg
  • 调用TransactionAspectSupport的createTransactionIfNecessary方法,返回TransactionInfo

protected TransactionInfo createTransactionIfNecessary(
            PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {

        // 使用DelegatingTransactionAttribute封装TransactionAttribute 此类提供了更多的功能
        if (txAttr != null && txAttr.getName() == null) {
            txAttr = new DelegatingTransactionAttribute(txAttr) {
                @Override
                public String getName() {
                    return joinpointIdentification;
                }
            };
        }

        TransactionStatus status = null;
        if (txAttr != null) {
            if (tm != null) {
                //获取事务 -- 创建事务的核心流程
                status = tm.getTransaction(txAttr);
            }
            else {
                if (logger.isDebugEnabled()) {
                    logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                            "] because no transaction manager has been configured");
                }
            }
        }
        //构建事务信息 TransactionInfo 并返回
        return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
    }
  • 总体流程为(其中4表示核心流程第四步):
    4.1 使用DelegatingTransactionAttribute封装TransactionAttribute 此类提供了更多的功能
    4.2 获取事务 核心 这里面也包含对嵌套事务的处理
核心是doBegin方法。

4.3 构建事务信息TransactionInfo并返回 也就是把TransactionStatus放进TransactionInfo里面。

获取事务 对应核心流程第4.2点

  • 获取事务总时序图 对应核心流程4.2


    获取事务流程.jpg
  • AbstractPlatformTransactionManager.getTransaction方法

public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
        //获取事务
        Object transaction = doGetTransaction();
    
        if (definition == null) {
            // Use defaults if no transaction definition given.
            definition = new DefaultTransactionDefinition();
        }
        //判断当前线程是否存在事务, 判断依据为当前线程记录的连接不为空且连接中(connectionHolder)
        //的transactionActive属性不为空
        if (isExistingTransaction(transaction)) {
            // 嵌套事务的处理
            return handleExistingTransaction(definition, transaction, debugEnabled);
        }

        // 超时验证
        if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
            throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
        }

        // 不存在事务但是事务属性是PROPAGATION_MANDATORY会抛出异常
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            throw new IllegalTransactionStateException(
                    "No existing transaction found for transaction marked with propagation 'mandatory'");
        }
        //以下几个都需要新建事务
        else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            //空挂起
            SuspendedResourcesHolder suspendedResources = suspend(null);
            try {
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                //构建transaction 包括设置ConnectionHolder 隔离级别 timout
                //如果是新连接需要绑定到当前线程
                doBegin(transaction, definition);
                //新同步事务的设定 针对当前线程
                prepareSynchronization(status, definition);
                return status;
            }
            catch (RuntimeException ex) {
                resume(null, suspendedResources);
                throw ex;
            }
            catch (Error err) {
                resume(null, suspendedResources);
                throw err;
            }
        }
        else {
            // Create "empty" transaction: no actual transaction, but potentially synchronization.
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                        "isolation level will effectively be ignored: " + definition);
            }
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
        }
    }
  • 总结流程为
    4.2.1 获取事务 这里调DataSourceTransactionManager.doGetTransaction方法创建基于JDBC的事务
protected Object doGetTransaction() {
        DataSourceTransactionObject txObject = new DataSourceTransactionObject();
        txObject.setSavepointAllowed(isNestedTransactionAllowed());
        //如果当前线程存在数据库连接直接使用原有连接
        ConnectionHolder conHolder =
                (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
        //false表示非新创连接
        txObject.setConnectionHolder(conHolder, false);
        return txObject;
    }

4.2.2 如果当前存在事务则转向嵌套事务

private TransactionStatus handleExistingTransaction(
            TransactionDefinition definition, Object transaction, boolean debugEnabled)
            throws TransactionException {
        //PROPAGATION_NEVER传播属性
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
            throw new IllegalTransactionStateException(
                    "Existing transaction found for transaction marked with propagation 'never'");
        }
        //PROPAGATION_NOT_SUPPORTED传播属性
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
            if (debugEnabled) {
                logger.debug("Suspending current transaction");
            }
            Object suspendedResources = suspend(transaction);
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            return prepareTransactionStatus(
                    definition, null, false, newSynchronization, debugEnabled, suspendedResources);
        }
        //PROPAGATION_REQUIRES_NEW传播属性 
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        
            //新事务的建立 先挂起旧事务
            SuspendedResourcesHolder suspendedResources = suspend(transaction);
            try {
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                //同4.2.6
                doBegin(transaction, definition);
                //同4.2.7
                prepareSynchronization(status, definition);
                return status;
            }
            catch (RuntimeException beginEx) {
                resumeAfterBeginException(transaction, suspendedResources, beginEx);
                throw beginEx;
            }
            catch (Error beginErr) {
                resumeAfterBeginException(transaction, suspendedResources, beginErr);
                throw beginErr;
            }
        }
        //PROPAGATION_NESTED传播属性
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            if (!isNestedTransactionAllowed()) {
                throw new NestedTransactionNotSupportedException(
                        "Transaction manager does not allow nested transactions by default - " +
                        "specify 'nestedTransactionAllowed' property with value 'true'");
            }
        
            if (useSavepointForNestedTransaction()) {
                // Create savepoint within existing Spring-managed transaction,
                DefaultTransactionStatus status =
                        prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
                status.createAndHoldSavepoint();
                return status;
            }
            else {
                // JTA不能使用保存点要新建事务
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, null);
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            }
        }
        if (isValidateExistingTransaction()) {
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                    Constants isoConstants = DefaultTransactionDefinition.constants;
                    throw new IllegalTransactionStateException("Participating transaction with definition [" +
                            definition + "] specifies isolation level which is incompatible with existing transaction: " +
                            (currentIsolationLevel != null ?
                                    isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                    "(unknown)"));
                }
            }
            if (!definition.isReadOnly()) {
                if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                    throw new IllegalTransactionStateException("Participating transaction with definition [" +
                            definition + "] is not marked as read-only but existing transaction is");
                }
            }
        }
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
    }

4.2.3 事务超时设定
4.2.4 事务propagationBehavior 不同传播行为不同表现
4.2.5 (走正常的流程 设置正常的传播属性时)构建DefaultTransactionStatus
4.2.6 完善transaction 包括设置ConnectionHolder,隔离级别,timeout,如果是新连接则绑定到当前线程。DataSourceTransactionManager.doBegin方法

protected void doBegin(Object transaction, TransactionDefinition definition) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        Connection con = null;

        try {
                        //如果当前线程connectionHolder已经存在 则没有必要获取
            if (txObject.getConnectionHolder() == null ||
                    txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                //跟数据库连接池挂钩
                Connection newCon = this.dataSource.getConnection();
                
                txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
            }
            txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
            con = txObject.getConnectionHolder().getConnection();
            //隔离级别设置 只读属性都是通过设置Connection 
            Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
            txObject.setPreviousIsolationLevel(previousIsolationLevel);
            
            // 更改自动提交设置由spring控制提交
            if (con.getAutoCommit()) {
                txObject.setMustRestoreAutoCommit(true);
            
                con.setAutoCommit(false);
            }
            //设置判断当前线程是否存在事务的依据
            txObject.getConnectionHolder().setTransactionActive(true);
            int timeout = determineTimeout(definition);
            if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
            }
            // Bind the session holder to the thread.
            if (txObject.isNewConnectionHolder()) {
                TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
            }
        }
        catch (Throwable ex) {
            ...
        }
    }

4.2.7 新同步事务的设定 针对当前线程

protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
        if (status.isNewSynchronization()) {
            TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
                    definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
                            definition.getIsolationLevel() : null);
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
            TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
            TransactionSynchronizationManager.initSynchronization();
        }
    }

异常处理 对应核心流程第六点

  • 未完待续

提交 对应核心流程第八点

  • 未完待续
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,254评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,875评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,682评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,896评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,015评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,152评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,208评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,962评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,388评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,700评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,867评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,551评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,186评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,901评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,689评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,757评论 2 351