声明
- spring对事务的处理是如何获取事务增强器的文章spring事务如何获取事务增强器
- 本文章是对spring声明式事务进行分析,从事务增强器开始分析,所有标注@Transaction的方法都会被拦截。
实验篇 不同传播属性事务嵌套实验
原理篇 spring事务总流程分析时序图
TransactionInterceptor
- 方法拦截器直接从invoke方法分析。
public class TransactionInterceptor extends TransactionAspectSupport implements
MethodInterceptor, Serializable {
@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
@Override
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
});
}
}
- 看invokeWithinTransaction对应的方法,这里调用TransactionAspectSupport的方法。
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
throws Throwable {
// 获取事务对应属性
final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
//获取bean中事务管理器
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
//构造方法唯一标识别 类名+方法名
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
//声明式事务
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// 创建事务信息--嵌套事务源码 回滚 提交都用到它
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// 执行增强方法
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 异常回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
//清除
cleanupTransactionInfo(txInfo);
}
//提交
commitTransactionAfterReturning(txInfo);
return retVal;
}
//编程式事务就不贴出来
}
- 这里面的核心流程
- 获取事务的属性
- 加载配置中心配置的TransactionManager
- 不用事务处理方式使用不同逻辑, 编程式事务需要有书屋属性,声明式事务暴露对外接口供自身回调
- 收集事务信息
- 执行目标方法
- 异常处理
- 提交事务前的事务信息清理
- 提交事务。
以下将对各个步骤进行分析。
获取事务的属性 对应核心流程第一点
- TransactionAttributeSource接口的getTransactionAttribute方法是获取对应属性
- AbstractFallbackTransactionAttributeSource(子类是AnnotationTransactionAttributeSource),实现了注解对应的getTransactionAttribute方法
@Override
public TransactionAttribute getTransactionAttribute(Method method, Class<?> targetClass) {
// First, see if we have a cached value.
Object cacheKey = getCacheKey(method, targetClass);
Object cached = this.attributeCache.get(cacheKey);
if (cached != null) {
//从缓存中拿
}
else {
// 缓存中不存在 则调用
TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass);
// Put it in the cache.
if (txAttr == null) {
//防止缓存击穿透
this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
}
else {
//放入缓存
this.attributeCache.put(cacheKey, txAttr);
}
return txAttr;
}
}
- computeTransactionAttribute方法
protected TransactionAttribute computeTransactionAttribute(Method method, Class<?> targetClass) {
// 第一当前方法上面找
TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
if (txAttr != null) {
return txAttr;
}
// 第二 类方法上面 找
txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
//第三接口上找有没@Transaction注解
if (specificMethod != method) {
....
}
return null;
}
收集事务信息 对应核心流程第四点
-
收集事务信息总时序图
调用TransactionAspectSupport的createTransactionIfNecessary方法,返回TransactionInfo
protected TransactionInfo createTransactionIfNecessary(
PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
// 使用DelegatingTransactionAttribute封装TransactionAttribute 此类提供了更多的功能
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
//获取事务 -- 创建事务的核心流程
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
//构建事务信息 TransactionInfo 并返回
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
- 总体流程为(其中4表示核心流程第四步):
4.1 使用DelegatingTransactionAttribute封装TransactionAttribute 此类提供了更多的功能
4.2 获取事务 核心 这里面也包含对嵌套事务的处理
核心是doBegin方法。
4.3 构建事务信息TransactionInfo并返回 也就是把TransactionStatus放进TransactionInfo里面。
获取事务 对应核心流程第4.2点
-
获取事务总时序图 对应核心流程4.2
AbstractPlatformTransactionManager.getTransaction方法
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
//获取事务
Object transaction = doGetTransaction();
if (definition == null) {
// Use defaults if no transaction definition given.
definition = new DefaultTransactionDefinition();
}
//判断当前线程是否存在事务, 判断依据为当前线程记录的连接不为空且连接中(connectionHolder)
//的transactionActive属性不为空
if (isExistingTransaction(transaction)) {
// 嵌套事务的处理
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// 超时验证
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
// 不存在事务但是事务属性是PROPAGATION_MANDATORY会抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
//以下几个都需要新建事务
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
//空挂起
SuspendedResourcesHolder suspendedResources = suspend(null);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//构建transaction 包括设置ConnectionHolder 隔离级别 timout
//如果是新连接需要绑定到当前线程
doBegin(transaction, definition);
//新同步事务的设定 针对当前线程
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException ex) {
resume(null, suspendedResources);
throw ex;
}
catch (Error err) {
resume(null, suspendedResources);
throw err;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
- 总结流程为
4.2.1 获取事务 这里调DataSourceTransactionManager.doGetTransaction方法创建基于JDBC的事务
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
//如果当前线程存在数据库连接直接使用原有连接
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
//false表示非新创连接
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
4.2.2 如果当前存在事务则转向嵌套事务
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
//PROPAGATION_NEVER传播属性
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
//PROPAGATION_NOT_SUPPORTED传播属性
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
//PROPAGATION_REQUIRES_NEW传播属性
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
//新事务的建立 先挂起旧事务
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//同4.2.6
doBegin(transaction, definition);
//同4.2.7
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
catch (Error beginErr) {
resumeAfterBeginException(transaction, suspendedResources, beginErr);
throw beginErr;
}
}
//PROPAGATION_NESTED传播属性
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}
else {
// JTA不能使用保存点要新建事务
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
4.2.3 事务超时设定
4.2.4 事务propagationBehavior 不同传播行为不同表现
4.2.5 (走正常的流程 设置正常的传播属性时)构建DefaultTransactionStatus
4.2.6 完善transaction 包括设置ConnectionHolder,隔离级别,timeout,如果是新连接则绑定到当前线程。DataSourceTransactionManager.doBegin方法
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
//如果当前线程connectionHolder已经存在 则没有必要获取
if (txObject.getConnectionHolder() == null ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
//跟数据库连接池挂钩
Connection newCon = this.dataSource.getConnection();
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
//隔离级别设置 只读属性都是通过设置Connection
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// 更改自动提交设置由spring控制提交
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
con.setAutoCommit(false);
}
//设置判断当前线程是否存在事务的依据
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// Bind the session holder to the thread.
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
...
}
}
4.2.7 新同步事务的设定 针对当前线程
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
definition.getIsolationLevel() : null);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
TransactionSynchronizationManager.initSynchronization();
}
}
异常处理 对应核心流程第六点
- 未完待续
提交 对应核心流程第八点
- 未完待续