前言
文章详细分析了在Spring框架中,由于事务嵌套导致的Transactionrolledbackbecauseithasbeenmarkedasrollback-only异常的原因和解决办法。问题源于内层事务异常被外层事务捕获,内层事务被标记为回滚,但外层事务尝试提交,引发冲突。解决方案包括让内层事务抛出的异常被外层事务处理后再抛出,或者改变事务的传播行为。
一、背景
业务在执行时,出现报错,日志如下所示:
org.springframework.transaction.UnexpectedRollbackException: Transaction rolled back because it has been marked as rollback-only
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processRollback(AbstractPlatformTransactionManager.java:871)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:708)
at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:654)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:407)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:762)
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:97)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:762)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:707)
at com.xtm.obd.mq.OrderPayCloseRocketMQConsumer$$EnhancerBySpringCGLIB$$a41fd4d2.onMessage(<generated>)
at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.handleMessage(DefaultRocketMQListenerContainer.java:461)
at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer$DefaultMessageListenerConcurrently.consumeMessage(DefaultRocketMQListenerContainer.java:421)
at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:411)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
异常:Transaction rolled back because it has been marked as rollback-only
原因:已经标记为rollback-only,但是后面的程序执行后又commit事务,抛出此异常。虽然都回滚,不影响正常业务。但是日志打印这种异常让人很难受。
二、测试代码:
TestController
@Autowired
TestRollbackService testRollbackService;
@RequestMapping("/test1")
public void test1(){
try{
testRollbackService.test1();
}catch(Exception e){
e.printStackTrace();
}
}
TestRollbackServiceImpl
@Autowired
StudentMapper studentMapper;
@Autowired
TestTransactionService testTransactionService;
@Transactional(rollbackFor = Exception.class)
public void test1(){
Student studentSelect = new Student();
studentSelect.setId(new Long(1));
Student student = studentMapper.selectByPrimaryKey(studentSelect);
try{
testTransactionService.test2();
}catch(Exception e){
e.printStackTrace();
}
}
TestTransactionServiceImpl
@Autowired
StudentMapper studentMapper;
@Transactional(rollbackFor = Exception.class)
public void test2(){
Student studentForInsert = new Student();
studentForInsert.setId(new Long(19));
studentForInsert.setName("测试11");
studentForInsert.setScore(new BigDecimal(69));
studentMapper.updateByPrimaryKey(studentForInsert);
System.out.println(1/0);
}
- TestRollbackService.test1(方法A)中调用了TestTransactionService.test2(方法B),上述代码可以触发回滚异常的报错
- 两个方法都加了事务注解,并且两个方法都会受到到事务管理的拦截器增强,并且事务传播的方式都是默认的,也就是REQUIRED,当已经存在事务的时候就加入事务,没有就创建事务。这里A和B都受事务控制,并且是处于同一个事务的。
- A调用B,A中抓了B的异常,当B发生异常的时候,B的操作应该回滚,但是A吃了异常,A方法中没有产生异常,所以A的操作又应该提交,二者是相互矛盾的。
- spring的事务关联拦截器在抓到B的异常后就会标记rollback-only为true,当A执行完准备提交后,发现rollback-only为true,也会回滚,并抛出异常告诉调用者。
程序时序图如下:
三、原理分析
3.1 程序入口
程序入口肯定是代理类,这里走是cglib的代理
- 入口方法: org.springframework.aop.framework.CglibAopProxy.DynamicAdvisedInterceptor#intercept
该方法中
retVal = (new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy)).proceed();
继续调用: org.springframework.aop.framework.ReflectiveMethodInvocation#proceed
然后到了事务管理的拦截器: org.springframework.transaction.interceptor.TransactionInterceptor#invoke
@SuppressWarnings("serial")
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
@Override
@Nullable
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
@Override
public Object getTarget() {
return invocation.getThis();
}
@Override
public Object[] getArguments() {
return invocation.getArguments();
}
});
}
}
3.2 invokeWithinTransaction(事务管理的主方法)
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
TransactionAttributeSource tas = getTransactionAttributeSource();
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
final TransactionManager tm = determineTransactionManager(txAttr);
if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
boolean isSuspendingFunction = KotlinDetector.isSuspendingFunction(method);
boolean hasSuspendingFlowReturnType = isSuspendingFunction &&
COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName());
if (isSuspendingFunction && !(invocation instanceof CoroutinesInvocationCallback)) {
throw new IllegalStateException("Coroutines invocation not supported: " + method);
}
CoroutinesInvocationCallback corInv = (isSuspendingFunction ? (CoroutinesInvocationCallback) invocation : null);
ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
Class<?> reactiveType =
(isSuspendingFunction ? (hasSuspendingFlowReturnType ? Flux.class : Mono.class) : method.getReturnType());
ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(reactiveType);
if (adapter == null) {
throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type [" +
method.getReturnType() + "] with specified transaction manager: " + tm);
}
return new ReactiveTransactionSupport(adapter);
});
InvocationCallback callback = invocation;
if (corInv != null) {
callback = () -> CoroutinesUtils.invokeSuspendingFunction(method, corInv.getTarget(), corInv.getArguments());
}
Object result = txSupport.invokeWithinTransaction(method, targetClass, callback, txAttr, (ReactiveTransactionManager) tm);
if (corInv != null) {
Publisher<?> pr = (Publisher<?>) result;
return (hasSuspendingFlowReturnType ? KotlinDelegate.asFlow(pr) :
KotlinDelegate.awaitSingleOrNull(pr, corInv.getContinuation()));
}
return result;
}
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}
if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}
commitTransactionAfterReturning(txInfo);
return retVal;
}
else {
Object result;
final ThrowableHolder throwableHolder = new ThrowableHolder();
// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
try {
result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
try {
Object retVal = invocation.proceedWithInvocation();
if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
return retVal;
}
catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
// A RuntimeException: will lead to a rollback.
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
else {
throw new ThrowableHolderException(ex);
}
}
else {
// A normal return value: will lead to a commit.
throwableHolder.throwable = ex;
return null;
}
}
finally {
cleanupTransactionInfo(txInfo);
}
});
}
catch (ThrowableHolderException ex) {
throw ex.getCause();
}
catch (TransactionSystemException ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
ex2.initApplicationException(throwableHolder.throwable);
}
throw ex2;
}
catch (Throwable ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
}
throw ex2;
}
// Check result state: It might indicate a Throwable to rethrow.
if (throwableHolder.throwable != null) {
throw throwableHolder.throwable;
}
return result;
}
}
}
程序执行的是最后的else分支,步骤很清晰
- 1.获取 TransactionAttribute
- 2.基于TransactionAttribute获取TransactionManager
- 3.基于TransactionAttribute获取 joinpointIdentification(没研究什么作用)
- 4.基于1,2,3创建的对象获取 TransactionAspectSupport.TransactionInfo,transactionInfo是TransactionAspectSupport的一个内部类
- 5.执行业务方法
- 6.抓到异常就回滚,并清除事务,然后向上抛异常;没有异常就清除事务,然后提交对象之间的关联关系
3.3 各个对象的获取
- TransactionManager的获取比较简单,程序里获取到的其实就是自己配置的bean
- 创建TransactionInfo的过程中要先获取TransactionStatus,TransactionStatus又需要拿到ConnectionHolder
3.3.1 createTransactionIfNecessary
org.springframework.transaction.interceptor.TransactionAspectSupport#createTransactionIfNecessary
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
@SuppressWarnings("serial")
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// If no name specified, apply method identification as transaction name.
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
}
3.3.2 获取TransactionStatus
这里先调用status = tm.getTransaction((TransactionDefinition)txAttr)创建TransactionStatus
org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction
@SuppressWarnings("serial")
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// Use defaults if no transaction definition given.
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(def, transaction, debugEnabled);
}
// Check definition settings for new transaction.
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
return startTransaction(def, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
}
3.3.3 获取transactionStatus前先获取DataSourceTransactionObject
- 程序最上面: Object transaction = this.doGetTransaction(); 创建DataSourceTransactionObject对象,这是DataSourceTransactionManager的内部类
org.springframework.jdbc.datasource.DataSourceTransactionManager#doGetTransaction
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean {
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(this.isNestedTransactionAllowed());
ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(this.obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
}
这里还获取了ConnectionHolder对象,这里newConnectionHolder为false
获取的方法如下: org.springframework.transaction.support.TransactionSynchronizationManager#getResource
public abstract class TransactionSynchronizationManager {
@Nullable
public static Object getResource(Object key) {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
return doGetResource(actualKey);
}
}
看代码很有意思,好像是通过一个key获取的,类似于从一个池子里面拿东西一样,其实当A方法执行的时候并没有获取到ConnectionHolder,拿到的是null
org.springframework.transaction.support.TransactionSynchronizationManager#doGetResource
public abstract class TransactionSynchronizationManager {
private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");
@Nullable
private static Object doGetResource(Object actualKey) {
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
Object value = map.get(actualKey);
// Transparently remove ResourceHolder that was marked as void...
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
map.remove(actualKey);
// Remove entire ThreadLocal if empty...
if (map.isEmpty()) {
resources.remove();
}
value = null;
}
return value;
}
}
resources对象其实是一个ThreadLocal,意思是同一个线程中拿到的ConnectionHolder是相同的
3.3.4 doBegin方法
org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean {
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject)transaction;
Connection con = null;
try {
if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = this.obtainDataSource().getConnection();
if (this.logger.isDebugEnabled()) {
this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (this.logger.isDebugEnabled()) {
this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
this.prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = this.determineTimeout(definition);
if (timeout != -1) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(this.obtainDataSource(), txObject.getConnectionHolder());
}
} catch (Throwable var7) {
Throwable ex = var7;
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, this.obtainDataSource());
txObject.setConnectionHolder((ConnectionHolder)null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
}
- 截取该方法的重要几行:
if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = this.obtainDataSource().getConnection();
if (this.logger.isDebugEnabled()) {
this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
先获取连接(java.sql.Connection),然后创建ConnectionHolder,newConnectionHolder设置为true,如果之前已经不为空了,newConnectionHolder就为false
如果newConnectionHolder 为 true,还需要将 connectionHolder放到threadLocal里面,让后面的方法可以获取到相同的ConnectionHolder,截取的代码如下:
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(this.obtainDataSource(), txObject.getConnectionHolder());
}
到这里TransactionStatus就创建好了
3.3.5 获取TransactionInfo
org.springframework.transaction.interceptor.TransactionAspectSupport#prepareTransactionInfo
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, String joinpointIdentification,
@Nullable TransactionStatus status) {
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
// We need a transaction for this method...
if (logger.isTraceEnabled()) {
logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// The transaction manager will flag an error if an incompatible tx already exists.
txInfo.newTransactionStatus(status);
}
else {
// The TransactionInfo.hasTransaction() method will return false. We created it only
// to preserve the integrity of the ThreadLocal stack maintained in this class.
if (logger.isTraceEnabled()) {
logger.trace("No need to create transaction for [" + joinpointIdentification +
"]: This method is not transactional.");
}
}
// We always bind the TransactionInfo to the thread, even if we didn't create
// a new transaction here. This guarantees that the TransactionInfo stack
// will be managed correctly even if no transaction was created by this aspect.
txInfo.bindToThread();
return txInfo;
}
}
- 细看 txInfo.bindToThread();
org.springframework.transaction.interceptor.TransactionAspectSupport.TransactionInfo#bindToThread
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
private static final ThreadLocal<TransactionInfo> transactionInfoHolder =
new NamedThreadLocal<>("Current aspect-driven transaction");
protected static final class TransactionInfo {
@Nullable
private TransactionInfo oldTransactionInfo;
private void bindToThread() {
// Expose current TransactionStatus, preserving any existing TransactionStatus
// for restoration after this transaction is complete.
this.oldTransactionInfo = transactionInfoHolder.get();
transactionInfoHolder.set(this);
}
}
}
bindToThread()的的作用是获取oldTransactionInfo。
3.4 B方法抛异常后的回滚操作
org.springframework.transaction.interceptor.TransactionAspectSupport#completeTransactionAfterThrowing
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
private class ReactiveTransactionSupport {
private final ReactiveAdapter adapter;
private Mono<Void> completeTransactionAfterThrowing(@Nullable ReactiveTransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.getReactiveTransaction() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
return txInfo.getTransactionManager().rollback(txInfo.getReactiveTransaction()).onErrorMap(ex2 -> {
logger.error("Application exception overridden by rollback exception", ex);
if (ex2 instanceof TransactionSystemException) {
((TransactionSystemException) ex2).initApplicationException(ex);
}
return ex2;
}
);
}
else {
// We don't roll back on this exception.
// Will still roll back if TransactionStatus.isRollbackOnly() is true.
return txInfo.getTransactionManager().commit(txInfo.getReactiveTransaction()).onErrorMap(ex2 -> {
logger.error("Application exception overridden by commit exception", ex);
if (ex2 instanceof TransactionSystemException) {
((TransactionSystemException) ex2).initApplicationException(ex);
}
return ex2;
}
);
}
}
return Mono.empty();
}
}
}
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());调用transactionManager进行rollback
org.springframework.transaction.support.AbstractPlatformTransactionManager#rollback
@SuppressWarnings("serial")
public abstract class AbstractReactiveTransactionManager implements ReactiveTransactionManager, Serializable {
@Override
public final Mono<Void> rollback(ReactiveTransaction transaction) {
if (transaction.isCompleted()) {
return Mono.error(new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction"));
}
return TransactionSynchronizationManager.forCurrentTransaction().flatMap(synchronizationManager -> {
GenericReactiveTransaction reactiveTx = (GenericReactiveTransaction) transaction;
return processRollback(synchronizationManager, reactiveTx);
});
}
}
- 进一步调自身的processRollback
org.springframework.transaction.reactive.AbstractReactiveTransactionManager#processRollback
@SuppressWarnings("serial")
public abstract class AbstractReactiveTransactionManager implements ReactiveTransactionManager, Serializable {
private Mono<Void> processRollback(TransactionSynchronizationManager synchronizationManager,
GenericReactiveTransaction status) {
return triggerBeforeCompletion(synchronizationManager, status).then(Mono.defer(() -> {
if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
return doRollback(synchronizationManager, status);
}
else {
Mono<Void> beforeCompletion = Mono.empty();
// Participating in larger transaction
if (status.hasTransaction()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
beforeCompletion = doSetRollbackOnly(synchronizationManager, status);
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
return beforeCompletion;
}
})).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, ex -> triggerAfterCompletion(
synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN)
.then(Mono.error(ex)))
.then(Mono.defer(() -> triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK)))
.onErrorResume(ex -> cleanupAfterCompletion(synchronizationManager, status).then(Mono.error(ex)))
.then(cleanupAfterCompletion(synchronizationManager, status));
}
}
this.triggerBeforeCompletion(status) 这个方法好像释放了连接
B不是新事务,所以最后会执行 this.doSetRollbackOnly(status);
org.springframework.jdbc.datasource.DataSourceTransactionManager#doSetRollbackOnly
public class ReactiveRedissonTransactionManager extends AbstractReactiveTransactionManager {
@Override
protected Mono<Void> doSetRollbackOnly(TransactionSynchronizationManager synchronizationManager, GenericReactiveTransaction status) throws TransactionException {
return Mono.fromRunnable(() -> {
ReactiveRedissonTransactionObject to = (ReactiveRedissonTransactionObject) status.getTransaction();
to.getResourceHolder().setRollbackOnly();
});
}
}
org.springframework.jdbc.datasource.DataSourceTransactionManager.DataSourceTransactionObject#setRollbackOnly
public abstract class ResourceHolderSupport implements ResourceHolder {
private boolean rollbackOnly = false;
public void setRollbackOnly() {
this.rollbackOnly = true;
}
}
这里可以看到最终设置的是connectionHolder的rollbackonly属性
3.5 A方法尝试提交时的操作
org.springframework.transaction.interceptor.TransactionAspectSupport#commitTransactionAfterReturning
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
}
同样的,这里调用transactionManager进行提交
org.springframework.transaction.support.AbstractPlatformTransactionManager#commit
@SuppressWarnings("serial")
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
@Override
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
processRollback(defStatus, false);
return;
}
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
processRollback(defStatus, true);
return;
}
processCommit(defStatus);
}
}
这个方法判断了一些无法提交的情况,程序这里走第二个分支,部分代码如下:
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
processRollback(defStatus, true);
return;
}
判断条件为:
1.全局不是rollbackonly的时候也提交(这个可能是一个配置的参数,配合在rollbackonly的时候也提交,也就是出现现在这种情况后,不用回滚,直接提交)
2.并且全局是rollbackonly
org.springframework.transaction.support.DefaultTransactionStatus#isGlobalRollbackOnly
public class DefaultTransactionStatus extends AbstractTransactionStatus {
@Nullable
private final Object transaction;
@Override
public boolean isGlobalRollbackOnly() {
return ((this.transaction instanceof SmartTransactionObject) &&
((SmartTransactionObject) this.transaction).isRollbackOnly());
}
}
这里又要满足两个条件
1.这里的transaction是DataSourceTransactionObject
DataSourceTransaction继承 JdbcTransactionObjectSupport
JdbcTransactionObjectSupport又实现 SmartTransactionObject,所以第一个条件满足2.DatSourceTransactionObject的 RollbackOnly 的get和set方法如下
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean {
private static class DataSourceTransactionObject extends JdbcTransactionObjectSupport {
public void setRollbackOnly() {
this.getConnectionHolder().setRollbackOnly();
}
public boolean isRollbackOnly() {
return this.getConnectionHolder().isRollbackOnly();
}
}
}
之前B方法抛出异常的时候,就是调用的DataSourceTransactionObject的set方法设置rollbackonly为true,现在再用get方法获取,只要是同一个connectionHolder,A获取到的rollbackOnly就是true,就会触发回滚,执行 this.processRollback(defStatus, true);
org.springframework.transaction.support.AbstractPlatformTransactionManager#processRollback
@SuppressWarnings("serial")
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
try {
triggerBeforeCompletion(status);
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
doRollback(status);
}
else {
// Participating in larger transaction
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
}
catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
}
finally {
cleanupAfterCompletion(status);
}
}
}
到这里 unexpectedRollback为true,就抛出了 "Transaction rolled back because it has been marked as rollback-only"这个异常了。
四、解决方案
原场景,内层事务的异常被外层事务捕获,内层被标记rollback,而外层提交,最后事务提交校验时抛出Transaction rolled back because it has been marked as rollback-only异常
@Transactional(rollbackFor = Exception.class)
public void methodA(){
insert();
System.out.println(1 / 0);
}
@Transactional(rollbackFor = Exception.class)
public void methodB(){
try{
methodA();
}catch(Exception e){
System.out.println("有异常");
}
}
可以分两种情况来解决这个异常
1、如果需要内层异常的情况下,回滚整个事务,可以让内层事务抛出的异常被外层事务的try----catch处理,再抛出新的异常,或者外层不通过try—catch处理这个异常。
2、当然如果内层事务没有复用过,只是在这个地方使用,直接把内层的事务去了,让他和外层合并成一个事务也能解决这个问题。
方案一
@Transactional(rollbackFor = Exception.class)
public void methodB(){
try{
insert();
methodA();
}catch(Exception e){
throw new Exception("存在异常")
}
}
@Transactional(rollbackFor = Exception.class)
public void methodA(){
System.out.println(1 / 0);
}
- 方案二
@Transactional(rollbackFor = Exception.class)
public void methodB(){
try{
insert();
methodA();
}catch(Exception e){
throw new Exception("存在异常")
}
}
public void methodA(){
System.out.println(1 / 0);
}
- 方案三
如果内层事务异常的情况下只回滚内层事务,修改内层事务的事务传播方式
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
public void methodA(){
System.out.println(1 / 0);
}
@Transactional(rollbackFor = Exception.class)
public void methodB(){
try{
insert();
methodA();
}catch(Exception e){
System.out.println("有异常");
}
}
原文出处:
https://blog.csdn.net/qq_42449106/article/details/130629369
参考:
https://blog.csdn.net/qq_42449106/article/details/130629369