事务管理器:
public interface PlatformTransactionManager {
TransactionStatus getTransaction(TransactionDefinition var1) throws TransactionException;
void commit(TransactionStatus var1) throws TransactionException;
void rollback(TransactionStatus var1) throws TransactionException;
}
这个接口很重要却很简单,就三个方法,获取事务/提交/回滚。但实现比较复杂。它的一个抽象实现类就是AbstractPlatformTransactionManager,这个实现类实现了事务管理的整个逻辑关系流程,把涉及和具体事务打交道的东西定义为抽象方法让子类去实现。
那么对应单个数据库事务的具体实现类就是DataSourceTransactionManager,这个类会完成这些和事务相关的具体的操作。
获取食物方法的整理流程如下:
1、调用doGetTransaction()方法从当前上下文中获取事务对象transaction。
2、事务已经存在:
- 如果此时是事务传播特性为NEVER,抛出异常
- 如果此时事务传播特性是NOT_SUPPORTED,则调用suspend(transaction)方法挂起当前事务,将被挂起的资源suspendedResurces放入事务状态里。
- 如果此时事务状态是REQUIRES_NEW,则调用suspend(transaction)挂起当前事务,将事务对象和被挂起的资源suspendedResources放入到事务状态里。再调用doBegin(transaction,definition)方法去真正的打开事务。最后调用prepareSynchronized(status, definition)方法准备事务同步。
- 如果此时事务传播特性是NESTED,有分为三种情况:
1)如果不允许嵌套事务,直接抛出异常
2)如果使用保存点(savePoint)来实现嵌套事务,那直接使用当前事务,创建一个保存点。
3)如果使用新事务来实现嵌套事务,调用doBegin(transaction,definition)开启新事务,不需要挂起当前事务,调用prepareSynchronized(status, definition)方法准备事务同步。
对于剩下的三种传播特性REQUIRES/MANDATORY/SUPPORTS,则不需要创建新事务,直接使用当前事务。
3、如果事务不存在:
- 如果此时事务的传播特性是MANDATORY,则抛出异常。
- 如果此时传播特性是REQUIRES/REQUIRES_NEW/NESTED,则调用suspend(null)挂起当前事务,将事务对象transaction和被挂起的资源suspendedResurces都放入到事务状态里。调用doBegin(transaction,definition)开启新事务,再调用prepareSynchronized(status, definition)方法准备事务同步。
- 剩下的三种传播特性SUPPORTS/NOT_SUPPORTED/NEVER,则不需要操作事务。
获取事务对象的方法如下:
protected Object doGetTransaction() {
DataSourceTransactionManager.DataSourceTransactionObject txObject = new DataSourceTransactionManager.DataSourceTransactionObject();
txObject.setSavepointAllowed(this.isNestedTransactionAllowed());
ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(this.dataSource);
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
1、事务对象,DataSourceTransactionObject从类名上看就是采用数据源实现的事务对象。这个事务对象主要保存了一个ConnectionHandler对象。
2、使用DataSource获取ConnectionHandler对象,从当前执行的上下文,即ThreadLocal里获取。
public abstract class TransactionSynchronizationManager {
private static final Log logger = LogFactory.getLog(TransactionSynchronizationManager.class);
private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal("Transactional resources");
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal("Transaction synchronizations");
private static final ThreadLocal<String> currentTransactionName = new NamedThreadLocal("Current transaction name");
private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal("Current transaction read-only status");
private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal("Current transaction isolation level");
private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal("Actual transaction active");
三点需要注意:
1)事务性资源是存储在Map<Object, Object>里,key就是DataSource对象,value就是ConnectionHolder对象
2)事务同步这个集合Set<TransactionSynchronization>只有在多个数据源的分布式事务时才使用
3)剩下的是四个和事务相关的变量,事务名称/是否只读/隔离级别/是否激活
3、将上一步获取的ConnectionHolder对象(也可能是null)放入事务对象中。
我们来看ConnectionHandler的定义:
public class ConnectionHolder extends ResourceHolderSupport {
public static final String SAVEPOINT_NAME_PREFIX = "SAVEPOINT_";
private ConnectionHandle connectionHandle;
private Connection currentConnection;
private boolean transactionActive;
private Boolean savepointsSupported;
private int savepointCounter;
public abstract class ResourceHolderSupport implements ResourceHolder {
private boolean synchronizedWithTransaction = false;
private boolean rollbackOnly = false;
private Date deadline;
private int referenceCount = 0;
一是它包含一个数据库链接(Connection),因此可以认为它就是表示一个物理事务。
二是它包含一个引用计数(referenceCount),来指示它被引用次数,表示当前有多少个逻辑事务关联到它(在单数据源时并没有使用该字段)
接着上面的源码往下:接着是判断一个事务是否已存在
protected boolean isExistingTransaction(Object transaction) {
DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
return txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive();
}
即事务对象要关联到一个物理事务(即有一个ConnectionHolder对象),同时物理事务还必须是活动的。
那么在首次执行时,事务肯定是不存在的,因为从线程的ThreadLocal里没有取出ConnectionHolder对象。那就新开一个事务,不过首先要看一下如何挂起一个事务:
protected final AbstractPlatformTransactionManager.SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List suspendedSynchronizations = this.doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
suspendedResources = this.doSuspend(transaction);
}
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName((String)null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel((Integer)null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
return new AbstractPlatformTransactionManager.SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
} catch (RuntimeException var8) {
this.doResumeSynchronization(suspendedSynchronizations);
throw var8;
} catch (Error var9) {
this.doResumeSynchronization(suspendedSynchronizations);
throw var9;
}
} else if (transaction != null) {
Object suspendedResources = this.doSuspend(transaction);
return new AbstractPlatformTransactionManager.SuspendedResourcesHolder(suspendedResources);
} else {
return null;
}
}
分三种情况:
1、事务同步是活动的,即物理事务已经被绑定到线程:
1.1、doSuspendSynchronization()方法返回的List是空的(多数据源分布式事务时才不为空)
1.2、doSuspend(transaction)去挂起当前事务。
1.3、从ThreadLocal里取出值,并同时清空ThreadLocal。
1.4、将这些值保存在SuspendedResourcesHolder类中,表示这些都是被挂起的资源。
2、物理事务是活动的,但是还没有绑定到线程,此时只需挂起事务就行了。
3、没有物理事务,什么都不做,返回null即可。
doSuspend具体方法如下:
protected Object doSuspend(Object transaction) {
DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
txObject.setConnectionHolder((ConnectionHolder)null);
ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.unbindResource(this.dataSource);
return conHolder;
}
包括两步:
1、将(逻辑)事务对象中关联的物理事务ConnectionHolder清空。
2、在线程的ThreadLocal里的Map<Object, Object>中删除这对DataSource->ConnectionHolder的映射。同时将ConnectionHolder返回。
这个被返回的ConnectionHolder(物理事务),就是上面中被挂起的资源suspendedResources。
下面开启一个新的事务
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
Connection con = null;
try {
if (txObject.getConnectionHolder() == null || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = this.dataSource.getConnection();
if (this.logger.isDebugEnabled()) {
this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (this.logger.isDebugEnabled()) {
this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = this.determineTimeout(definition);
if (timeout != -1) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(this.getDataSource(), txObject.getConnectionHolder());
}
} catch (Throwable var7) {
DataSourceUtils.releaseConnection(con, this.dataSource);
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", var7);
}
}
如果逻辑事务没有和一个物理资源相关联,或这个物理资源还没有和一个事务同步,此时新建一个数据库链接,并把这个链接包装到一个ConnectionHolder里。并设置给事务对象。
接下来将这个物理资源ConnectionHolder标记为已同步一个事务。然后将数据库链接设置为非自动提交。最后把DataSource和ConnectionHolder绑定到当前线程。
一个事务获取后,返回的结果是一个事务状态:
protected DefaultTransactionStatus newTransactionStatus(TransactionDefinition definition, Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, Object suspendedResources) {
boolean actualNewSynchronization = newSynchronization && !TransactionSynchronizationManager.isSynchronizationActive();
return new DefaultTransactionStatus(transaction, newTransaction, actualNewSynchronization, definition.isReadOnly(), debug, suspendedResources);
}
一是它包含了逻辑事务对象(已关联了物理事务)。
二是需要表明这个事务是一个新开启的物理事务,还是参与到已有的物理事务。
三是它包含了被挂起的(上一个)物理事务对象(如果有的话).
提交方法:
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");
} else {
DefaultTransactionStatus defStatus = (DefaultTransactionStatus)status;
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
this.logger.debug("Transactional code has requested rollback");
}
this.processRollback(defStatus);
} else if (!this.shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
this.logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
this.processRollback(defStatus);
if (status.isNewTransaction() || this.isFailEarlyOnGlobalRollbackOnly()) {
throw new UnexpectedRollbackException("Transaction rolled back because it has been marked as rollback-only");
}
} else {
this.processCommit(defStatus);
}
}
}
1、如果事务已完成则抛出异常
2、主动回滚。
3、被动回滚,因为全局范围被设置了回滚。
4、进入提交事务。
提交事务方法:
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
this.prepareForCommit(status);
this.triggerBeforeCommit(status);
this.triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
boolean globalRollbackOnly = false;
if (status.isNewTransaction() || this.isFailEarlyOnGlobalRollbackOnly()) {
globalRollbackOnly = status.isGlobalRollbackOnly();
}
if (status.hasSavepoint()) {
if (status.isDebug()) {
this.logger.debug("Releasing transaction savepoint");
}
status.releaseHeldSavepoint();
} else if (status.isNewTransaction()) {
if (status.isDebug()) {
this.logger.debug("Initiating transaction commit");
}
this.doCommit(status);
}
if (globalRollbackOnly) {
throw new UnexpectedRollbackException("Transaction silently rolled back because it has been marked as rollback-only");
}
} catch (UnexpectedRollbackException var19) {
this.triggerAfterCompletion(status, 1);
throw var19;
} catch (TransactionException var20) {
if (this.isRollbackOnCommitFailure()) {
this.doRollbackOnCommitException(status, var20);
} else {
this.triggerAfterCompletion(status, 2);
}
throw var20;
} catch (RuntimeException var21) {
if (!beforeCompletionInvoked) {
this.triggerBeforeCompletion(status);
}
this.doRollbackOnCommitException(status, var21);
throw var21;
} catch (Error var22) {
if (!beforeCompletionInvoked) {
this.triggerBeforeCompletion(status);
}
this.doRollbackOnCommitException(status, var22);
throw var22;
}
try {
this.triggerAfterCommit(status);
} finally {
this.triggerAfterCompletion(status, 0);
}
} finally {
this.cleanupAfterCompletion(status);
}
}
1、如果创建了保存点,就将其释放掉,因为现在已经在提交流程中,保存点已经没有用了。
2、只有当前的逻辑事务新打开了物理事务时才提交。只是参与到已存在的物理事务中时不提交(因为这个物理事务还对应了其它没有执行完的逻辑事务)
具体的事务提交:
protected void doCommit(DefaultTransactionStatus status) {
DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
this.logger.debug("Committing JDBC transaction on Connection [" + con + "]");
}
try {
con.commit();
} catch (SQLException var5) {
throw new TransactionSystemException("Could not commit JDBC transaction", var5);
}
}
从事务状态中获取保存的逻辑事务对象,再获取它关联的物理事务,再获取关联的数据库链接,最后执行commit操作.
rollback方法:
public final void rollback(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");
} else {
DefaultTransactionStatus defStatus = (DefaultTransactionStatus)status;
this.processRollback(defStatus);
}
}
如果事务已完成则抛出异常,否则执行回滚;
回滚具体代码:
private void processRollback(DefaultTransactionStatus status) {
try {
try {
this.triggerBeforeCompletion(status);
if (status.hasSavepoint()) {
if (status.isDebug()) {
this.logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();
} else if (status.isNewTransaction()) {
if (status.isDebug()) {
this.logger.debug("Initiating transaction rollback");
}
this.doRollback(status);
} else if (status.hasTransaction()) {
if (!status.isLocalRollbackOnly() && !this.isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
this.logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
} else {
if (status.isDebug()) {
this.logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
this.doSetRollbackOnly(status);
}
} else {
this.logger.debug("Should roll back transaction but cannot - no transaction available");
}
} catch (RuntimeException var7) {
this.triggerAfterCompletion(status, 2);
throw var7;
} catch (Error var8) {
this.triggerAfterCompletion(status, 2);
throw var8;
}
this.triggerAfterCompletion(status, 1);
} finally {
this.cleanupAfterCompletion(status);
}
}
1、如果有保存点的回滚到保存点。
2、如果是新开的物理事务,则进行回滚。
3、若是参与到已有的事务中,只能标记为回滚
具体的执行回滚和标记回滚如下:
protected void doRollback(DefaultTransactionStatus status) {
DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
this.logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
}
try {
con.rollback();
} catch (SQLException var5) {
throw new TransactionSystemException("Could not roll back JDBC transaction", var5);
}
}
protected void doSetRollbackOnly(DefaultTransactionStatus status) {
DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)status.getTransaction();
if (status.isDebug()) {
this.logger.debug("Setting JDBC transaction [" + txObject.getConnectionHolder().getConnection() + "] rollback-only");
}
txObject.setRollbackOnly();
}
无论是提交还是回滚,都表示一个事务的完成。如果它之前有挂起的事务,则需要进行恢复:
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
status.setCompleted();
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.clear();
}
if (status.isNewTransaction()) {
this.doCleanupAfterCompletion(status.getTransaction());
}
if (status.getSuspendedResources() != null) {
if (status.isDebug()) {
this.logger.debug("Resuming suspended transaction after completion of inner transaction");
}
this.resume(status.getTransaction(), (AbstractPlatformTransactionManager.SuspendedResourcesHolder)status.getSuspendedResources());
}
}
事务恢复逻辑:
protected final void resume(Object transaction, AbstractPlatformTransactionManager.SuspendedResourcesHolder resourcesHolder) throws TransactionException {
if (resourcesHolder != null) {
Object suspendedResources = resourcesHolder.suspendedResources;
if (suspendedResources != null) {
this.doResume(transaction, suspendedResources);
}
List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
if (suspendedSynchronizations != null) {
TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
this.doResumeSynchronization(suspendedSynchronizations);
}
}
}
其中被挂起的资源就是ConnectionHolder对象。
具体恢复操作如下:
protected void doResume(Object transaction, Object suspendedResources) {
ConnectionHolder conHolder = (ConnectionHolder)suspendedResources;
TransactionSynchronizationManager.bindResource(this.dataSource, conHolder);
}
把DataSource->ConnectionHolder重新绑定到线程的ThreadLocal里的Map<Object, Object>中。(转)