Spring事务报错Transaction rolled back because it has been marked as rollback-only

前言

文章详细分析了在Spring框架中,由于事务嵌套导致的Transactionrolledbackbecauseithasbeenmarkedasrollback-only异常的原因和解决办法。问题源于内层事务异常被外层事务捕获,内层事务被标记为回滚,但外层事务尝试提交,引发冲突。解决方案包括让内层事务抛出的异常被外层事务处理后再抛出,或者改变事务的传播行为。

一、背景

业务在执行时,出现报错,日志如下所示:

org.springframework.transaction.UnexpectedRollbackException: Transaction rolled back because it has been marked as rollback-only
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.processRollback(AbstractPlatformTransactionManager.java:871)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:708)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:654)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:407)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:762)
    at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:97)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:762)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:707)
    at com.xtm.obd.mq.OrderPayCloseRocketMQConsumer$$EnhancerBySpringCGLIB$$a41fd4d2.onMessage(<generated>)
    at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.handleMessage(DefaultRocketMQListenerContainer.java:461)
    at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer$DefaultMessageListenerConcurrently.consumeMessage(DefaultRocketMQListenerContainer.java:421)
    at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:411)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
  • 异常:Transaction rolled back because it has been marked as rollback-only

  • 原因:已经标记为rollback-only,但是后面的程序执行后又commit事务,抛出此异常。虽然都回滚,不影响正常业务。但是日志打印这种异常让人很难受。

二、测试代码:

TestController

@Autowired
TestRollbackService testRollbackService;

@RequestMapping("/test1")
public void test1(){
    try{
        testRollbackService.test1();
    }catch(Exception e){
        e.printStackTrace();
    }
}

TestRollbackServiceImpl

@Autowired
StudentMapper studentMapper;

@Autowired
TestTransactionService testTransactionService;

@Transactional(rollbackFor = Exception.class)
public void test1(){
    Student studentSelect = new Student();
    studentSelect.setId(new Long(1));
    Student student = studentMapper.selectByPrimaryKey(studentSelect);
    try{
        testTransactionService.test2();
    }catch(Exception e){
        e.printStackTrace();
    }
}

TestTransactionServiceImpl

@Autowired
StudentMapper studentMapper;

@Transactional(rollbackFor = Exception.class)
public void test2(){
    Student studentForInsert = new Student();
    studentForInsert.setId(new Long(19));
    studentForInsert.setName("测试11");
    studentForInsert.setScore(new BigDecimal(69));
    studentMapper.updateByPrimaryKey(studentForInsert);
    System.out.println(1/0);
}
  • TestRollbackService.test1(方法A)中调用了TestTransactionService.test2(方法B),上述代码可以触发回滚异常的报错

 

  • 两个方法都加了事务注解,并且两个方法都会受到到事务管理的拦截器增强,并且事务传播的方式都是默认的,也就是REQUIRED,当已经存在事务的时候就加入事务,没有就创建事务。这里A和B都受事务控制,并且是处于同一个事务的。

 

  • A调用B,A中抓了B的异常,当B发生异常的时候,B的操作应该回滚,但是A吃了异常,A方法中没有产生异常,所以A的操作又应该提交,二者是相互矛盾的。

 

  • spring的事务关联拦截器在抓到B的异常后就会标记rollback-only为true,当A执行完准备提交后,发现rollback-only为true,也会回滚,并抛出异常告诉调用者。

程序时序图如下:

image.png

三、原理分析

3.1 程序入口

程序入口肯定是代理类,这里走是cglib的代理

  • 入口方法: org.springframework.aop.framework.CglibAopProxy.DynamicAdvisedInterceptor#intercept

该方法中

retVal = (new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy)).proceed();
  • 继续调用: org.springframework.aop.framework.ReflectiveMethodInvocation#proceed

  • 然后到了事务管理的拦截器: org.springframework.transaction.interceptor.TransactionInterceptor#invoke

@SuppressWarnings("serial")
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
    
    @Override
    @Nullable
    public Object invoke(MethodInvocation invocation) throws Throwable {
        // Work out the target class: may be {@code null}.
        // The TransactionAttributeSource should be passed the target class
        // as well as the method, which may be from an interface.
        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

        // Adapt to TransactionAspectSupport's invokeWithinTransaction...
        return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
            @Override
            @Nullable
            public Object proceedWithInvocation() throws Throwable {
                return invocation.proceed();
            }
            @Override
            public Object getTarget() {
                return invocation.getThis();
            }
            @Override
            public Object[] getArguments() {
                return invocation.getArguments();
            }
        });
    }
}

3.2 invokeWithinTransaction(事务管理的主方法)

public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
    
    @Nullable
    protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
            final InvocationCallback invocation) throws Throwable {

        // If the transaction attribute is null, the method is non-transactional.
        TransactionAttributeSource tas = getTransactionAttributeSource();
        final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
        final TransactionManager tm = determineTransactionManager(txAttr);

        if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
            boolean isSuspendingFunction = KotlinDetector.isSuspendingFunction(method);
            boolean hasSuspendingFlowReturnType = isSuspendingFunction &&
                    COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName());
            if (isSuspendingFunction && !(invocation instanceof CoroutinesInvocationCallback)) {
                throw new IllegalStateException("Coroutines invocation not supported: " + method);
            }
            CoroutinesInvocationCallback corInv = (isSuspendingFunction ? (CoroutinesInvocationCallback) invocation : null);

            ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
                Class<?> reactiveType =
                        (isSuspendingFunction ? (hasSuspendingFlowReturnType ? Flux.class : Mono.class) : method.getReturnType());
                ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(reactiveType);
                if (adapter == null) {
                    throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type [" +
                            method.getReturnType() + "] with specified transaction manager: " + tm);
                }
                return new ReactiveTransactionSupport(adapter);
            });

            InvocationCallback callback = invocation;
            if (corInv != null) {
                callback = () -> CoroutinesUtils.invokeSuspendingFunction(method, corInv.getTarget(), corInv.getArguments());
            }
            Object result = txSupport.invokeWithinTransaction(method, targetClass, callback, txAttr, (ReactiveTransactionManager) tm);
            if (corInv != null) {
                Publisher<?> pr = (Publisher<?>) result;
                return (hasSuspendingFlowReturnType ? KotlinDelegate.asFlow(pr) :
                        KotlinDelegate.awaitSingleOrNull(pr, corInv.getContinuation()));
            }
            return result;
        }

        PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
        final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

        if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
            // Standard transaction demarcation with getTransaction and commit/rollback calls.
            TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

            Object retVal;
            try {
                // This is an around advice: Invoke the next interceptor in the chain.
                // This will normally result in a target object being invoked.
                retVal = invocation.proceedWithInvocation();
            }
            catch (Throwable ex) {
                // target invocation exception
                completeTransactionAfterThrowing(txInfo, ex);
                throw ex;
            }
            finally {
                cleanupTransactionInfo(txInfo);
            }

            if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
                // Set rollback-only in case of Vavr failure matching our rollback rules...
                TransactionStatus status = txInfo.getTransactionStatus();
                if (status != null && txAttr != null) {
                    retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
                }
            }

            commitTransactionAfterReturning(txInfo);
            return retVal;
        }

        else {
            Object result;
            final ThrowableHolder throwableHolder = new ThrowableHolder();

            // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
            try {
                result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
                    TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
                    try {
                        Object retVal = invocation.proceedWithInvocation();
                        if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
                            // Set rollback-only in case of Vavr failure matching our rollback rules...
                            retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
                        }
                        return retVal;
                    }
                    catch (Throwable ex) {
                        if (txAttr.rollbackOn(ex)) {
                            // A RuntimeException: will lead to a rollback.
                            if (ex instanceof RuntimeException) {
                                throw (RuntimeException) ex;
                            }
                            else {
                                throw new ThrowableHolderException(ex);
                            }
                        }
                        else {
                            // A normal return value: will lead to a commit.
                            throwableHolder.throwable = ex;
                            return null;
                        }
                    }
                    finally {
                        cleanupTransactionInfo(txInfo);
                    }
                });
            }
            catch (ThrowableHolderException ex) {
                throw ex.getCause();
            }
            catch (TransactionSystemException ex2) {
                if (throwableHolder.throwable != null) {
                    logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
                    ex2.initApplicationException(throwableHolder.throwable);
                }
                throw ex2;
            }
            catch (Throwable ex2) {
                if (throwableHolder.throwable != null) {
                    logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
                }
                throw ex2;
            }

            // Check result state: It might indicate a Throwable to rethrow.
            if (throwableHolder.throwable != null) {
                throw throwableHolder.throwable;
            }
            return result;
        }
    }
}

程序执行的是最后的else分支,步骤很清晰

  • 1.获取 TransactionAttribute
  • 2.基于TransactionAttribute获取TransactionManager
  • 3.基于TransactionAttribute获取 joinpointIdentification(没研究什么作用)
  • 4.基于1,2,3创建的对象获取 TransactionAspectSupport.TransactionInfo,transactionInfo是TransactionAspectSupport的一个内部类
  • 5.执行业务方法
  • 6.抓到异常就回滚,并清除事务,然后向上抛异常;没有异常就清除事务,然后提交对象之间的关联关系
image.png

3.3 各个对象的获取

  • TransactionManager的获取比较简单,程序里获取到的其实就是自己配置的bean
  • 创建TransactionInfo的过程中要先获取TransactionStatus,TransactionStatus又需要拿到ConnectionHolder

3.3.1 createTransactionIfNecessary

org.springframework.transaction.interceptor.TransactionAspectSupport#createTransactionIfNecessary

public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
    
    @SuppressWarnings("serial")
    protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
            @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

        // If no name specified, apply method identification as transaction name.
        if (txAttr != null && txAttr.getName() == null) {
            txAttr = new DelegatingTransactionAttribute(txAttr) {
                @Override
                public String getName() {
                    return joinpointIdentification;
                }
            };
        }

        TransactionStatus status = null;
        if (txAttr != null) {
            if (tm != null) {
                status = tm.getTransaction(txAttr);
            }
            else {
                if (logger.isDebugEnabled()) {
                    logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                            "] because no transaction manager has been configured");
                }
            }
        }
        return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
    }
}

3.3.2 获取TransactionStatus

这里先调用status = tm.getTransaction((TransactionDefinition)txAttr)创建TransactionStatus

org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction

@SuppressWarnings("serial")
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {

    @Override
    public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
            throws TransactionException {

        // Use defaults if no transaction definition given.
        TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());

        Object transaction = doGetTransaction();
        boolean debugEnabled = logger.isDebugEnabled();

        if (isExistingTransaction(transaction)) {
            // Existing transaction found -> check propagation behavior to find out how to behave.
            return handleExistingTransaction(def, transaction, debugEnabled);
        }

        // Check definition settings for new transaction.
        if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
            throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
        }

        // No existing transaction found -> check propagation behavior to find out how to proceed.
        if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            throw new IllegalTransactionStateException(
                    "No existing transaction found for transaction marked with propagation 'mandatory'");
        }
        else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            SuspendedResourcesHolder suspendedResources = suspend(null);
            if (debugEnabled) {
                logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
            }
            try {
                return startTransaction(def, transaction, debugEnabled, suspendedResources);
            }
            catch (RuntimeException | Error ex) {
                resume(null, suspendedResources);
                throw ex;
            }
        }
        else {
            // Create "empty" transaction: no actual transaction, but potentially synchronization.
            if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                        "isolation level will effectively be ignored: " + def);
            }
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
        }
    }
}

3.3.3 获取transactionStatus前先获取DataSourceTransactionObject

  • 程序最上面: Object transaction = this.doGetTransaction(); 创建DataSourceTransactionObject对象,这是DataSourceTransactionManager的内部类

org.springframework.jdbc.datasource.DataSourceTransactionManager#doGetTransaction

public class DataSourceTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean {
    
    protected Object doGetTransaction() {
        DataSourceTransactionObject txObject = new DataSourceTransactionObject();
        txObject.setSavepointAllowed(this.isNestedTransactionAllowed());
        ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(this.obtainDataSource());
        txObject.setConnectionHolder(conHolder, false);
        return txObject;
    }
}
  • 这里还获取了ConnectionHolder对象,这里newConnectionHolder为false

  • 获取的方法如下: org.springframework.transaction.support.TransactionSynchronizationManager#getResource

public abstract class TransactionSynchronizationManager {
            
    @Nullable
    public static Object getResource(Object key) {
        Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
        return doGetResource(actualKey);
    }
}   

看代码很有意思,好像是通过一个key获取的,类似于从一个池子里面拿东西一样,其实当A方法执行的时候并没有获取到ConnectionHolder,拿到的是null

org.springframework.transaction.support.TransactionSynchronizationManager#doGetResource

public abstract class TransactionSynchronizationManager {
    
    private static final ThreadLocal<Map<Object, Object>> resources =
            new NamedThreadLocal<>("Transactional resources");  
            
    @Nullable
    private static Object doGetResource(Object actualKey) {
        Map<Object, Object> map = resources.get();
        if (map == null) {
            return null;
        }
        Object value = map.get(actualKey);
        // Transparently remove ResourceHolder that was marked as void...
        if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
            map.remove(actualKey);
            // Remove entire ThreadLocal if empty...
            if (map.isEmpty()) {
                resources.remove();
            }
            value = null;
        }
        return value;
    }
}   

resources对象其实是一个ThreadLocal,意思是同一个线程中拿到的ConnectionHolder是相同的

3.3.4 doBegin方法

org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin

public class DataSourceTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean {
    
    protected void doBegin(Object transaction, TransactionDefinition definition) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject)transaction;
        Connection con = null;

        try {
            if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                Connection newCon = this.obtainDataSource().getConnection();
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
                }

                txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
            }

            txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
            con = txObject.getConnectionHolder().getConnection();
            Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
            txObject.setPreviousIsolationLevel(previousIsolationLevel);
            txObject.setReadOnly(definition.isReadOnly());
            if (con.getAutoCommit()) {
                txObject.setMustRestoreAutoCommit(true);
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
                }

                con.setAutoCommit(false);
            }

            this.prepareTransactionalConnection(con, definition);
            txObject.getConnectionHolder().setTransactionActive(true);
            int timeout = this.determineTimeout(definition);
            if (timeout != -1) {
                txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
            }

            if (txObject.isNewConnectionHolder()) {
                TransactionSynchronizationManager.bindResource(this.obtainDataSource(), txObject.getConnectionHolder());
            }

        } catch (Throwable var7) {
            Throwable ex = var7;
            if (txObject.isNewConnectionHolder()) {
                DataSourceUtils.releaseConnection(con, this.obtainDataSource());
                txObject.setConnectionHolder((ConnectionHolder)null, false);
            }

            throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
        }
    }
}
  • 截取该方法的重要几行:
if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
    Connection newCon = this.obtainDataSource().getConnection();
    if (this.logger.isDebugEnabled()) {
        this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
    }

    txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
  • 先获取连接(java.sql.Connection),然后创建ConnectionHolder,newConnectionHolder设置为true,如果之前已经不为空了,newConnectionHolder就为false

  • 如果newConnectionHolder 为 true,还需要将 connectionHolder放到threadLocal里面,让后面的方法可以获取到相同的ConnectionHolder,截取的代码如下:

if (txObject.isNewConnectionHolder()) {
    TransactionSynchronizationManager.bindResource(this.obtainDataSource(), txObject.getConnectionHolder());
}

到这里TransactionStatus就创建好了

3.3.5 获取TransactionInfo

org.springframework.transaction.interceptor.TransactionAspectSupport#prepareTransactionInfo

public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
    
    protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
            @Nullable TransactionAttribute txAttr, String joinpointIdentification,
            @Nullable TransactionStatus status) {

        TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
        if (txAttr != null) {
            // We need a transaction for this method...
            if (logger.isTraceEnabled()) {
                logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
            }
            // The transaction manager will flag an error if an incompatible tx already exists.
            txInfo.newTransactionStatus(status);
        }
        else {
            // The TransactionInfo.hasTransaction() method will return false. We created it only
            // to preserve the integrity of the ThreadLocal stack maintained in this class.
            if (logger.isTraceEnabled()) {
                logger.trace("No need to create transaction for [" + joinpointIdentification +
                        "]: This method is not transactional.");
            }
        }

        // We always bind the TransactionInfo to the thread, even if we didn't create
        // a new transaction here. This guarantees that the TransactionInfo stack
        // will be managed correctly even if no transaction was created by this aspect.
        txInfo.bindToThread();
        return txInfo;
    }
}
  • 细看 txInfo.bindToThread();
    org.springframework.transaction.interceptor.TransactionAspectSupport.TransactionInfo#bindToThread
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
    
    private static final ThreadLocal<TransactionInfo> transactionInfoHolder =
            new NamedThreadLocal<>("Current aspect-driven transaction");

    protected static final class TransactionInfo {  

        @Nullable
        private TransactionInfo oldTransactionInfo;
        
        private void bindToThread() {
            // Expose current TransactionStatus, preserving any existing TransactionStatus
            // for restoration after this transaction is complete.
            this.oldTransactionInfo = transactionInfoHolder.get();
            transactionInfoHolder.set(this);
        }
    }
}

bindToThread()的的作用是获取oldTransactionInfo。

3.4 B方法抛异常后的回滚操作

org.springframework.transaction.interceptor.TransactionAspectSupport#completeTransactionAfterThrowing

public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
    
    private class ReactiveTransactionSupport {

        private final ReactiveAdapter adapter;  
    
        private Mono<Void> completeTransactionAfterThrowing(@Nullable ReactiveTransactionInfo txInfo, Throwable ex) {
            if (txInfo != null && txInfo.getReactiveTransaction() != null) {
                if (logger.isTraceEnabled()) {
                    logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
                            "] after exception: " + ex);
                }
                if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
                    return txInfo.getTransactionManager().rollback(txInfo.getReactiveTransaction()).onErrorMap(ex2 -> {
                                logger.error("Application exception overridden by rollback exception", ex);
                                if (ex2 instanceof TransactionSystemException) {
                                    ((TransactionSystemException) ex2).initApplicationException(ex);
                                }
                                return ex2;
                            }
                    );
                }
                else {
                    // We don't roll back on this exception.
                    // Will still roll back if TransactionStatus.isRollbackOnly() is true.
                    return txInfo.getTransactionManager().commit(txInfo.getReactiveTransaction()).onErrorMap(ex2 -> {
                                logger.error("Application exception overridden by commit exception", ex);
                                if (ex2 instanceof TransactionSystemException) {
                                    ((TransactionSystemException) ex2).initApplicationException(ex);
                                }
                                return ex2;
                            }
                    );
                }
            }
            return Mono.empty();
        }
    }
}

txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());调用transactionManager进行rollback

org.springframework.transaction.support.AbstractPlatformTransactionManager#rollback

@SuppressWarnings("serial")
public abstract class AbstractReactiveTransactionManager implements ReactiveTransactionManager, Serializable {
    @Override
    public final Mono<Void> rollback(ReactiveTransaction transaction) {
        if (transaction.isCompleted()) {
            return Mono.error(new IllegalTransactionStateException(
                    "Transaction is already completed - do not call commit or rollback more than once per transaction"));
        }
        return TransactionSynchronizationManager.forCurrentTransaction().flatMap(synchronizationManager -> {
            GenericReactiveTransaction reactiveTx = (GenericReactiveTransaction) transaction;
            return processRollback(synchronizationManager, reactiveTx);
        });
    }
}   
  • 进一步调自身的processRollback
    org.springframework.transaction.reactive.AbstractReactiveTransactionManager#processRollback
@SuppressWarnings("serial")
public abstract class AbstractReactiveTransactionManager implements ReactiveTransactionManager, Serializable {
    
    private Mono<Void> processRollback(TransactionSynchronizationManager synchronizationManager,
            GenericReactiveTransaction status) {

        return triggerBeforeCompletion(synchronizationManager, status).then(Mono.defer(() -> {
            if (status.isNewTransaction()) {
                if (status.isDebug()) {
                    logger.debug("Initiating transaction rollback");
                }
                return doRollback(synchronizationManager, status);
            }
            else {
                Mono<Void> beforeCompletion = Mono.empty();
                // Participating in larger transaction
                if (status.hasTransaction()) {
                    if (status.isDebug()) {
                        logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
                    }
                    beforeCompletion = doSetRollbackOnly(synchronizationManager, status);
                }
                else {
                    logger.debug("Should roll back transaction but cannot - no transaction available");
                }
                return beforeCompletion;
            }
        })).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, ex -> triggerAfterCompletion(
                synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN)
                .then(Mono.error(ex)))
                .then(Mono.defer(() -> triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK)))
                .onErrorResume(ex -> cleanupAfterCompletion(synchronizationManager, status).then(Mono.error(ex)))
            .then(cleanupAfterCompletion(synchronizationManager, status));
    }
}   
  • this.triggerBeforeCompletion(status) 这个方法好像释放了连接

  • B不是新事务,所以最后会执行 this.doSetRollbackOnly(status);

  • org.springframework.jdbc.datasource.DataSourceTransactionManager#doSetRollbackOnly

public class ReactiveRedissonTransactionManager extends AbstractReactiveTransactionManager {
    
    @Override
    protected Mono<Void> doSetRollbackOnly(TransactionSynchronizationManager synchronizationManager, GenericReactiveTransaction status) throws TransactionException {
        return Mono.fromRunnable(() -> {
            ReactiveRedissonTransactionObject to = (ReactiveRedissonTransactionObject) status.getTransaction();
            to.getResourceHolder().setRollbackOnly();
        });
    }
}

org.springframework.jdbc.datasource.DataSourceTransactionManager.DataSourceTransactionObject#setRollbackOnly

public abstract class ResourceHolderSupport implements ResourceHolder {

    private boolean rollbackOnly = false;
    
    public void setRollbackOnly() {
        this.rollbackOnly = true;
    }
}

这里可以看到最终设置的是connectionHolder的rollbackonly属性

3.5 A方法尝试提交时的操作

org.springframework.transaction.interceptor.TransactionAspectSupport#commitTransactionAfterReturning

public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
    
    protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
        if (txInfo != null && txInfo.getTransactionStatus() != null) {
            if (logger.isTraceEnabled()) {
                logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
            }
            txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
        }
    }
}

同样的,这里调用transactionManager进行提交
org.springframework.transaction.support.AbstractPlatformTransactionManager#commit

@SuppressWarnings("serial")
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {

    @Override
    public final void commit(TransactionStatus status) throws TransactionException {
        if (status.isCompleted()) {
            throw new IllegalTransactionStateException(
                    "Transaction is already completed - do not call commit or rollback more than once per transaction");
        }

        DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
        if (defStatus.isLocalRollbackOnly()) {
            if (defStatus.isDebug()) {
                logger.debug("Transactional code has requested rollback");
            }
            processRollback(defStatus, false);
            return;
        }

        if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
            if (defStatus.isDebug()) {
                logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
            }
            processRollback(defStatus, true);
            return;
        }

        processCommit(defStatus);
    }
}

这个方法判断了一些无法提交的情况,程序这里走第二个分支,部分代码如下:

if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
    if (defStatus.isDebug()) {
        logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
    }
    processRollback(defStatus, true);
    return;
}

判断条件为:

  • 1.全局不是rollbackonly的时候也提交(这个可能是一个配置的参数,配合在rollbackonly的时候也提交,也就是出现现在这种情况后,不用回滚,直接提交)

  • 2.并且全局是rollbackonly
    org.springframework.transaction.support.DefaultTransactionStatus#isGlobalRollbackOnly

public class DefaultTransactionStatus extends AbstractTransactionStatus {

    @Nullable
    private final Object transaction;
    
    @Override
    public boolean isGlobalRollbackOnly() {
        return ((this.transaction instanceof SmartTransactionObject) &&
                ((SmartTransactionObject) this.transaction).isRollbackOnly());
    }
}

这里又要满足两个条件

  • 1.这里的transaction是DataSourceTransactionObject
    DataSourceTransaction继承 JdbcTransactionObjectSupport
    JdbcTransactionObjectSupport又实现 SmartTransactionObject,所以第一个条件满足

  • 2.DatSourceTransactionObject的 RollbackOnly 的get和set方法如下

public class DataSourceTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean {
    
    private static class DataSourceTransactionObject extends JdbcTransactionObjectSupport {
        
        public void setRollbackOnly() {
            this.getConnectionHolder().setRollbackOnly();
        }

        public boolean isRollbackOnly() {
            return this.getConnectionHolder().isRollbackOnly();
        }
    }
}

之前B方法抛出异常的时候,就是调用的DataSourceTransactionObject的set方法设置rollbackonly为true,现在再用get方法获取,只要是同一个connectionHolder,A获取到的rollbackOnly就是true,就会触发回滚,执行 this.processRollback(defStatus, true);

org.springframework.transaction.support.AbstractPlatformTransactionManager#processRollback

@SuppressWarnings("serial")
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
    
    private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
        try {
            boolean unexpectedRollback = unexpected;

            try {
                triggerBeforeCompletion(status);

                if (status.hasSavepoint()) {
                    if (status.isDebug()) {
                        logger.debug("Rolling back transaction to savepoint");
                    }
                    status.rollbackToHeldSavepoint();
                }
                else if (status.isNewTransaction()) {
                    if (status.isDebug()) {
                        logger.debug("Initiating transaction rollback");
                    }
                    doRollback(status);
                }
                else {
                    // Participating in larger transaction
                    if (status.hasTransaction()) {
                        if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
                            if (status.isDebug()) {
                                logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
                            }
                            doSetRollbackOnly(status);
                        }
                        else {
                            if (status.isDebug()) {
                                logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
                            }
                        }
                    }
                    else {
                        logger.debug("Should roll back transaction but cannot - no transaction available");
                    }
                    // Unexpected rollback only matters here if we're asked to fail early
                    if (!isFailEarlyOnGlobalRollbackOnly()) {
                        unexpectedRollback = false;
                    }
                }
            }
            catch (RuntimeException | Error ex) {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                throw ex;
            }

            triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

            // Raise UnexpectedRollbackException if we had a global rollback-only marker
            if (unexpectedRollback) {
                throw new UnexpectedRollbackException(
                        "Transaction rolled back because it has been marked as rollback-only");
            }
        }
        finally {
            cleanupAfterCompletion(status);
        }
    }
}

到这里 unexpectedRollback为true,就抛出了 "Transaction rolled back because it has been marked as rollback-only"这个异常了。

四、解决方案

原场景,内层事务的异常被外层事务捕获,内层被标记rollback,而外层提交,最后事务提交校验时抛出Transaction rolled back because it has been marked as rollback-only异常

@Transactional(rollbackFor = Exception.class)
public void methodA(){
    insert();
    System.out.println(1 / 0);
}
 
@Transactional(rollbackFor = Exception.class)
public void methodB(){
    try{
        methodA();
    }catch(Exception e){
        System.out.println("有异常");
    }
}

可以分两种情况来解决这个异常

  • 1、如果需要内层异常的情况下,回滚整个事务,可以让内层事务抛出的异常被外层事务的try----catch处理,再抛出新的异常,或者外层不通过try—catch处理这个异常。

  • 2、当然如果内层事务没有复用过,只是在这个地方使用,直接把内层的事务去了,让他和外层合并成一个事务也能解决这个问题。

  • 方案一

@Transactional(rollbackFor = Exception.class)
public void methodB(){
    try{
        insert();
        methodA();
    }catch(Exception e){
        throw new Exception("存在异常")
    }
}

@Transactional(rollbackFor = Exception.class)
public void methodA(){
    System.out.println(1 / 0);
}
  • 方案二
@Transactional(rollbackFor = Exception.class)
public void methodB(){
    try{
        insert();
        methodA();
    }catch(Exception e){
        throw new Exception("存在异常")
    }
}

public void methodA(){
    System.out.println(1 / 0);
}
  • 方案三
    如果内层事务异常的情况下只回滚内层事务,修改内层事务的事务传播方式
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
public void methodA(){
    System.out.println(1 / 0);
}
 
@Transactional(rollbackFor = Exception.class)
public void methodB(){
    try{
        insert();
        methodA();
    }catch(Exception e){
        System.out.println("有异常");
    }
}

原文出处:

https://blog.csdn.net/qq_42449106/article/details/130629369

参考:
https://blog.csdn.net/qq_42449106/article/details/130629369

https://www.cnblogs.com/reecelin/p/18434994

https://www.cnblogs.com/super-chao/p/15169923.html

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容