1 概述
在Spring事务中,我们可以配置事务的传播属性,传播属性的处理在函数AbstractPlatformTransactionManager.handleExistingTransaction
中,具体可参考源码。关于Spring中传播属性的定义可见其官方文档Transaction Propagation,对于本文介绍的PROPAGATION_NESTED,Spring官方文档描述如下:
PROPAGATION_NESTED
uses a single physical transaction with multiple savepoints that it can roll back to. Such partial rollbacks let an inner transaction scope trigger a rollback for its scope, with the outer transaction being able to continue the physical transaction despite some operations having been rolled back. This setting is typically mapped onto JDBC savepoints, so it works only with JDBC resource transactions. See Spring’sDataSourceTransactionManager
.
可见,Spring采用一个物理事务,但是结合着savepoint机制(MySql中称为保存点)实现一个事务中的指定范围提交。
2 保存点创建准备
Spring如何使用AOP实现事务控制的逻辑这里不去详细介绍,我们通过源码追踪可以发现调用轨迹如下:
TransactionInterceptor.invoke
->
TransactionAspectSupport.invokeWithinTransaction
->
TransactionAspectSupport.createTransactionIfNecessary
->
AbstractPlatformTransactionManager.getTransaction
->
AbstractPlatformTransactionManager.doGetTransaction
->
这里我们看AbstractPlatformTransactionManager
子类DataSourceTransactionManager.doGetTransaction
实现
继续上面的过程:
//DataSourceTransactionManager
@Override
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
//看下方列出的DataSourceTransactionManager构造函数,
//isNestedTransactionAllowed会返回true
//就是默认支持嵌套事务
//而嵌套事务又是采用savepoint实现的
txObject.setSavepointAllowed(isNestedTransactionAllowed());
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
/**
* Create a new DataSourceTransactionManager instance.
* A DataSource has to be set to be able to use it.
* @see #setDataSource
*/
public DataSourceTransactionManager() {
setNestedTransactionAllowed(true);
}
上面已经介绍了,如果支持嵌套事务,则创建的DataSourceTransactionObject.isSavepointAllowed
会被设为true
。
3 保存点创建
在第一章概述中说到,如果传播属性设为PROPAGATION_NESTED,如果创建事务时已经存在了一个事务,则会创建一个嵌套事务:
//AbstractPlatformTransactionManager
//省略其他不相关代码
/**
* Create a TransactionStatus for an existing transaction.
*/
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
...
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
//如果当前TransactionManager不支持嵌套事务
//直接抛错
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
//判断当前TransactionManager实现是否是采用保存点实现嵌套事务
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
//创建保存点
status.createAndHoldSavepoint();
return status;
}
else {
//使用嵌套的begin、commit/rollback实现嵌套事务,MySql
//不支持,因为如果已经调用过begin,提交之前再调用
//begin操作,MySql会隐式调用一次commit,不能达到嵌套事务
//的效果,这种方式某些数据库可能会支持,这里不做介绍
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
...
}
上面关于MySql begin操作隐式进行提交可参考其官方描述Statements That Cause an Implicit Commit
DefaultTransactionStatus.createAndHoldSavepoint
在其父类AbstractTransactionStatus
实现:
//AbstractTransactionStatus
//具体怎么创建的不再展开,可以自行查看代码
/**
* Create a savepoint and hold it for the transaction.
* @throws org.springframework.transaction.NestedTransactionNotSupportedException
* if the underlying transaction does not support savepoints
*/
public void createAndHoldSavepoint() throws TransactionException {
setSavepoint(getSavepointManager().createSavepoint());
}
DefaultTransactionStatus
创建保存点之后同时会保存该保存点,也就是上面setSavepoint
的调用。
4 保存点提交或释放
4.1 保存点提交
在事务完成之后,会进行事务提交,具体的会调用AbstractPlatformTransactionManager.commit
//AbstractPlatformTransactionManager
/**
* This implementation of commit handles participating in existing
* transactions and programmatic rollback requests.
* Delegates to {@code isRollbackOnly}, {@code doCommit}
* and {@code rollback}.
* @see org.springframework.transaction.TransactionStatus#isRollbackOnly()
* @see #doCommit
* @see #rollback
*/
@Override
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
processRollback(defStatus, false);
return;
}
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
processRollback(defStatus, true);
return;
}
processCommit(defStatus);
}
我们先看正常提交processCommit
:
//AbstractPlatformTransactionManager
/**
* Process an actual commit.
* Rollback-only flags have already been checked and applied.
* @param status object representing the transaction
* @throws TransactionException in case of commit failure
*/
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
//如果当前status有保存点,表示当前提交的是嵌套在某个事务
//内的子事务,通过释放保存点提交
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
//释放保存的保存点
status.releaseHeldSavepoint();
}//else表示通过begin、commit/rollback实现子事务,这里不介绍
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
doCommit(status);
}
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
cleanupAfterCompletion(status);
}
}
4.2 保存点释放
发生异常时的回滚操作实现如下:
//AbstractPlatformTransactionManager
/**
* Process an actual rollback.
* The completed flag has already been checked.
* @param status object representing the transaction
* @throws TransactionException in case of rollback failure
*/
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
try {
triggerBeforeCompletion(status);
//如果有保存点,同样是对当前保存点进行回滚,
//依此达到部分回滚的功能
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
doRollback(status);
}
else {
// Participating in larger transaction
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
}
catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
}
finally {
cleanupAfterCompletion(status);
}
}