配置注解版事务
事务管理器
- 事务管理器的接口是
PlatformTransactionManager
,其中定义了三个接口方法如下:-
TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException
:获取事务,如果当前没有事务,那么就根据传播级别创建一个事务。-
TransactionDefinition
:其中定义了事务的传播属性,比如默认的传播属性(当前没有事务就开启事务)等。
-
-
void commit(TransactionStatus status) throws TransactionException;
:提交事务-
TransactionStatus
:其中定义了一些事务的状态和查询、判断事务状态的方法
-
-
void rollback(TransactionStatus status) throws TransactionException;
:回滚事务
-
-
PlatformTransactionManager
的实现类有很多,比如结合JDBC操作的DataSourceTransactionManager
、配置JTA、Hibernate的事务管理器。
注入事务管理器【JDBC】
- 注入
PlatformTransactionManager
,步骤如下:- 注入数据源,这里我们使用的是结合JDBC,因此需要注入对应的事务管理器
DataSourceTransactionManager
- 注入数据源,这里我们使用的是结合JDBC,因此需要注入对应的事务管理器
//注入数据源,这里使用的是阿里的Druid,这里只是简单的配置
@Bean
public DruidDataSource dataSource(){
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUsername("******");
dataSource.setPassword("*****");
dataSource.setUrl("******");
dataSource.setInitialSize(10);
dataSource.setMaxActive(20);
dataSource.setMaxIdle(100000);
return dataSource;
}
//注入事务管理器
@Bean
public PlatformTransactionManager transactionManager(DataSource dataSource) {
DataSourceTransactionManager dataSourceTransactionManager = new DataSourceTransactionManager(dataSource);
return dataSourceTransactionManager;
}
- 开启事务,使用
@EnableTransactionManagement
@Configuration
@ComponentScan(basePackages = {"cn.tedu.demo.*"})
@EnableTransactionManagement
public class MainConfig {}
源码解析事务
必知的知识和类
PlatformTransactionManager
- 事务管理器的接口,其中定义一些事务的方法,有提交,回滚,获取事务的方法。
- 实现类如下:
-
DataSourceTransactionManager
:适用于JDBC事务的管理 -
JtaTransactionManager
:适用于多数据源的事务管理,实现强一致性事务
-
TransactionDefinition
- 该接口主要定义了事务的一些属性,比如事务的传播行为,隔离级别等数据。
@EnableTransactionManagement
- 此注解的源码如下,其实真正起作用的就是
@Import(TransactionManagementConfigurationSelector.class)
,使用@Import
这个注解向容器中注入了其他的Bean,详情请看我的Spring注解版开发,其中有@Import的使用
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(TransactionManagementConfigurationSelector.class)
public @interface EnableTransactionManagement {
TransactionManagementConfigurationSelector
- 这个实现了
ImportSelector
,结合@Import注解使用,是@Import注解的注入Bean的其中一种方式,有一个必须重载的方法,如下:- 此方法返回的
BeanName
的数组的全部Bean将会被注入到容器中
- 此方法返回的
String[] selectImports(AnnotationMetadata importingClassMetadata);
- TransactionManagementConfigurationSelector对上面的这个方法重写了,如下:
- 主要流程就是根据
@EnableTransactionManagement
中mode属性返回对应的BeanName,如果值是PROXY
【默认】,那么就会注入AutoProxyRegistrar
、ProxyTransactionManagementConfiguration
这两个Bean,那么这个方法的作用就是如此,因此我们需要看看注入的两个Bean到底是什么作用?
- 主要流程就是根据
@Override
protected String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {AutoProxyRegistrar.class.getName(),
ProxyTransactionManagementConfiguration.class.getName()};
case ASPECTJ:
return new String[] {determineTransactionAspectClass()};
default:
return null;
}
}
private String determineTransactionAspectClass() {
return (ClassUtils.isPresent("javax.transaction.Transactional", getClass().getClassLoader()) ?
TransactionManagementConfigUtils.JTA_TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME :
TransactionManagementConfigUtils.TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME);
}
AutoProxyRegistrar
- 该类实现了
ImportBeanDefinitionRegistrar
,主要的作用就是根据@EnableTransactionManagement
属性中的mode
和proxyTargetClass
,注入对应的AutoProxyCreator
【APC】,代码如下:
//importingClassMetadata:其中封装了配置类的所有注解
//registry:用于注入BeanDefintion
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
//标记
boolean candidateFound = false;
//获取所有的注解类型
Set<String> annoTypes = importingClassMetadata.getAnnotationTypes();
//遍历注解
for (String annoType : annoTypes) {
//获取指定注解的全部属性的值
AnnotationAttributes candidate = AnnotationConfigUtils.attributesFor(importingClassMetadata, annoType);
if (candidate == null) {
continue;
}
//获取对应的mode和proxyTargetClass属性的值
Object mode = candidate.get("mode");
Object proxyTargetClass = candidate.get("proxyTargetClass");
//如果这些值都存在,那么可以判定配置类上标注了`@EnableTransactionManagement`这个注解
if (mode != null && proxyTargetClass != null && AdviceMode.class == mode.getClass() &&
Boolean.class == proxyTargetClass.getClass()) {
//标记设置为true
candidateFound = true;
//根据mode的值,注入不同的APC
if (mode == AdviceMode.PROXY) {
AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry);
if ((Boolean) proxyTargetClass) {
AopConfigUtils.forceAutoProxyCreatorToUseClassProxying(registry);
return;
}
}
}
}
if (!candidateFound && logger.isInfoEnabled()) {
String name = getClass().getSimpleName();
logger.info(String.format("%s was imported but no annotations were found " +
"having both 'mode' and 'proxyTargetClass' attributes of type " +
"AdviceMode and boolean respectively. This means that auto proxy " +
"creator registration and configuration may not have occurred as " +
"intended, and components may not be proxied as expected. Check to " +
"ensure that %s has been @Import'ed on the same class where these " +
"annotations are declared; otherwise remove the import of %s " +
"altogether.", name, name, name));
}
}
-
其中核心的代码就是注入不同的APC的代码,如下:
-
AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry);
- 实际作用的源码如下:
//cls是InfrastructureAdvisorAutoProxyCreator.class,APC的一种实现类 //注入的BeanName是org.springframework.aop.config.internalAutoProxyCreator private static BeanDefinition registerOrEscalateApcAsRequired( Class<?> cls, BeanDefinitionRegistry registry, @Nullable Object source) { Assert.notNull(registry, "BeanDefinitionRegistry must not be null"); //判断对应的APC是否已经注入了 if (registry.containsBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME)) { BeanDefinition apcDefinition = registry.getBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME); if (!cls.getName().equals(apcDefinition.getBeanClassName())) { int currentPriority = findPriorityForClass(apcDefinition.getBeanClassName()); int requiredPriority = findPriorityForClass(cls); if (currentPriority < requiredPriority) { apcDefinition.setBeanClassName(cls.getName()); } } return null; } //没有注入,直接使用RootBeanDefinition注入,BeanName是org.springframework.aop.config.internalAutoProxyCreator RootBeanDefinition beanDefinition = new RootBeanDefinition(cls); beanDefinition.setSource(source); beanDefinition.getPropertyValues().add("order", Ordered.HIGHEST_PRECEDENCE); beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); registry.registerBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME, beanDefinition); return beanDefinition; }
-
-
AopConfigUtils.forceAutoProxyCreatorToUseClassProxying(registry);
:强制使用子类代理【cglib代理】,实际作用的源码如下:- 实际的作用就是设置了proxyTargetClass为true
public static void forceAutoProxyCreatorToUseClassProxying(BeanDefinitionRegistry registry) {
if (registry.containsBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME)) {
BeanDefinition definition = registry.getBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME);
definition.getPropertyValues().add("proxyTargetClass", Boolean.TRUE);
}
}
总结
- 该类是TransactionManagementSlector选择器注入的类,主要作用就是根据@EnableTranactionMangement注解中的mode和proxyTarget的值注入(AutoProxyCreator)【简称APC】,实际的APC的类型是
InfrastructureAdvisorAutoProxyCreator
ProxyTransactionManagementConfiguration
- 该类是一个配置类,如下:
//ProxyTransactionManagementConfiguration配置类
@Configuration
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {}
//AbstractTransactionManagementConfiguration
@Configuration
public abstract class AbstractTransactionManagementConfiguration implements ImportAware {}
- 从上面的源码可以知道抽象的
AbstractTransactionManagementConfiguration
类实现了ImportAware
,那么其中的重载的方法一定是很重要的,代码如下:
@Override
public void setImportMetadata(AnnotationMetadata importMetadata) {
//获取@EnableTransactionMangement的属性值赋值给enableTx
this.enableTx = AnnotationAttributes.fromMap(
importMetadata.getAnnotationAttributes(EnableTransactionManagement.class.getName(), false));
//如果为null,抛出异常
if (this.enableTx == null) {
throw new IllegalArgumentException(
"@EnableTransactionManagement is not present on importing class " + importMetadata.getClassName());
}
}
- 上述讲解了
AbstractTransactionManagementConfiguration
中的一个重要实现,其实就是将@EnableTransactionMangement
的属性值赋值给enableTx - 从
ProxyTransactionManagementConfiguration
的源码可以知道,其实就是向容器中注入了三个Bean,分别是org.springframework.transaction.interceptor.BeanFactoryTransactionAttributeSourceAdvisor
、org.springframework.transaction.interceptor.TransactionAttributeSource
、org.springframework.transaction.interceptor.TransactionInterceptor
ImportAware
该类的作用是获取标注在实现了该接口的配置类上的所有注解的元数据,包括注解的属性,值,类型等信息。
-
ImportAware同样实现了Aware接口,但是这个和BeanFactoryAware、ResourceLoaderAware等不同的是,这个接口必须是由配置类【即是标注了@Configuration注解】实现,并且需要结合@Import注解使用才能生效,否则不能生效,如下的使用方式将会生效。
- 一定要结合@Import使用,可以直接导入配置类,也可以使用selector方式的注入,总之是要结合@Import注解使用,否则不能生效
@Configuration
@ComponentScan(basePackages = {"cn.tedu.demo.*"})
@EnableTransactionManagement
@Import(value = {DruidConfig.class})
public class MainConfig {}
@Configuration
public class DruidConfig implements ImportAware {
@Override
public void setImportMetadata(AnnotationMetadata importMetadata) {
System.out.println(importMetadata);
}
}
InfrastructureAdvisorAutoProxyCreator【@EnableTransactionManagement导入】
该APC是在@EnableTransactionMangement注解作用下注入到ioc容器中的
代理创建器,继承了
AbstractAutoProxyCreator
,这个和注解版AOP的代理创建器(AnnotationAwareAspectJAutoProxyCreator
)继承的是一个类,主要的作用就是创建代理对象,我们之前也是知道事务的底层其实就是使用AOP进行操作的,继承的关系图如下:
[图片上传失败...(image-d23350-1562499352984)]
- 从上面的继承关系图可以很清晰的看到,这个类实现了不少的关于Bean生命周期的接口,因此我们只需要把实现的这些接口打上断点,即可清楚的分析出执行的流程了。这个和AOP讲解的类似,有些东西就不再一一讲述了
- 主要的功能就是为标注了
@Transactional
创建代理对象,其中最重要的逻辑就是找到候选的增强器【事务的实现注入的增强器就是BeanFactoryTransactionAttributeSourceAdvisor】,主要的逻辑如下:
1)在org.springframework.aop.framework.autoproxy.AbstractAutoProxyCreator#wrapIfNecessary方法中是创建代理对象的主要方法,不过在这之前需要获取所有适用当前Bean的所有增强器(Advisor),调用的是Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);这个方法
1.1) 进入org.springframework.aop.framework.autoproxy.AbstractAdvisorAutoProxyCreator#getAdvicesAndAdvisorsForBean方法,调用的是List<Advisor> advisors = findEligibleAdvisors(beanClass, beanName)这段代码,获取可用的增强器
1.2) 进入,实际调用的是List<Advisor> eligibleAdvisors = findAdvisorsThatCanApply(candidateAdvisors, beanClass, beanName)这段代码,获取能够作用在beanClass的增强器
1.3)进入,实际调用的是AopUtils.findAdvisorsThatCanApply(candidateAdvisors, beanClass)这段代码
1.4)一路跟进,最终到了org.springframework.aop.support.AopUtils#canApply(org.springframework.aop.Pointcut, ava.lang.Class<?>, boolean),主要的逻辑开始了
1.4.1) 最重要的代码便是循环获取的接口,获取其中的方法,调用methodMatcher.matches(method, targetClass)匹配,源码如下:
for (Class<?> clazz : classes) {
Method[] methods = ReflectionUtils.getAllDeclaredMethods(clazz);
for (Method method : methods) {
if (introductionAwareMethodMatcher != null ?
introductionAwareMethodMatcher.matches(method, targetClass, hasIntroductions) :
methodMatcher.matches(method, targetClass)) {
return true;
}
}
}
1.4.2)跟进,进入了org.springframework.transaction.interceptor.AbstractFallbackTransactionAttributeSource#computeTransactionAttribute方法,主要的逻辑就是先判断当前Bean的所有方法是否有@Transactional注解,之后判断该类头上是否有@Transactional注解
1.4.3)最重要的解析是否存在@Transactional注解的方法就是org.springframework.transaction.annotation.SpringTransactionAnnotationParser#parseTransactionAnnotation(java.lang.reflect.AnnotatedElement),其中的逻辑如下:
public TransactionAttribute parseTransactionAnnotation(AnnotatedElement element) {
//获取注解@Transactional中的属性值
AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(
element, Transactional.class, false, false);
//如果属性值不为空,表示该类获取方法上标注了@Transactional注解
if (attributes != null) {
//解析注解的属性值,封装在TransactionAttribute中返回
return parseTransactionAnnotation(attributes);
}
else {
return null;
}
}
//解析注解的属性值,将其封装在TransactionAttribute中
protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
Propagation propagation = attributes.getEnum("propagation");
rbta.setPropagationBehavior(propagation.value());
Isolation isolation = attributes.getEnum("isolation");
rbta.setIsolationLevel(isolation.value());
rbta.setTimeout(attributes.getNumber("timeout").intValue());
rbta.setReadOnly(attributes.getBoolean("readOnly"));
rbta.setQualifier(attributes.getString("value"));
List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
rollbackRules.add(new RollbackRuleAttribute(rbRule));
}
for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
rollbackRules.add(new RollbackRuleAttribute(rbRule));
}
for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
}
for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
}
rbta.setRollbackRules(rollbackRules);
return rbta;
}
SpringTransactionAnnotationParser
- @Transactional注解解析器,主要的作用就是解析指定类或者方法【标注有@Transactional注解的】上的@Transactional注解中的属性值,将其封装在
TransactionAttribute
中,主要的逻辑如下:
public TransactionAttribute parseTransactionAnnotation(AnnotatedElement element) {
//获取注解中的所有属性值
AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(
element, Transactional.class, false, false);
if (attributes != null) {
//如果属性不为空,调用parseTransactionAnnotation方法封装
return parseTransactionAnnotation(attributes);
}
else {
return null;
}
}
protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
//封装传播行为和隔离级别
Propagation propagation = attributes.getEnum("propagation");
rbta.setPropagationBehavior(propagation.value());
Isolation isolation = attributes.getEnum("isolation");
rbta.setIsolationLevel(isolation.value());
rbta.setTimeout(attributes.getNumber("timeout").intValue());
rbta.setReadOnly(attributes.getBoolean("readOnly"));
rbta.setQualifier(attributes.getString("value"));
//封装回滚规则
List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
rollbackRules.add(new RollbackRuleAttribute(rbRule));
}
for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
rollbackRules.add(new RollbackRuleAttribute(rbRule));
}
for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
}
for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
}
rbta.setRollbackRules(rollbackRules);
return rbta;
}
AnnotationTransactionAttributeSource【@EnableTransactionManagement导入】
实现了
TransactionAttributeSource
,简单的说这个类中封装了所有类、方法上标注的@Transactional注解的属性值【以TransactionAttribute的属性保存在private final Map<Object, TransactionAttribute> attributeCache = new ConcurrentHashMap<>(1024)
,key是通过类,方法生成的MethodClassKey
对象,value是TransactionAttribute】-
重要的属性:
- attributeCache中存储了所有类的方法的对应的@Transactional注解属性的值,后续在执行事务的时候,就是直接从这个缓存中直接获取的。
-
其中有几个重要的方法,如下:
-
public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass)
:根据类、方法获取指定的TransactionAttribute ,具体的实现在AbstractFallbackTransactionAttributeSource
类中,源码如下:- 其中findTransactionAttribute()方法没有继续跟进,因为其中的执行逻辑就是使用注解解析器(SpringTransactionalAnnotationParser)进行对@Transactional注解的解析,封装在TransactionAttribute返回。
- attributeCache中存储了所有类的方法的对应的@Transactional注解属性的值,后续在执行事务的时候,就是直接从这个缓存中直接获取的。
//method:指定的方法,targetClass指定的类 public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) { //如果是Object,直接返回null if (method.getDeclaringClass() == Object.class) { return null; } // 第一步先直接从缓存中取值,如果存在,直接返回即可 Object cacheKey = getCacheKey(method, targetClass); TransactionAttribute cached = this.attributeCache.get(cacheKey); if (cached != null) { // Value will either be canonical value indicating there is no transaction attribute, // or an actual transaction attribute. if (cached == NULL_TRANSACTION_ATTRIBUTE) { return null; } else { return cached; } } else { //如果缓存中没有,那么需要调用computeTransactionAttribute方法去ioc中解析 TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass); //将取出的值存入缓存中,下次再取就不需要解析了,直接取值即可 if (txAttr == null) { this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE); } else { //获取方法的名称,全类名+方法名的形式 String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass); if (txAttr instanceof DefaultTransactionAttribute) { ((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification); } if (logger.isTraceEnabled()) { logger.trace("Adding transactional method '" + methodIdentification + "' with attribute: " + txAttr); } this.attributeCache.put(cacheKey, txAttr); } return txAttr; } } /**********************************computeTransactionAttribute****************/ protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) { //如果方法不是public类型的,直接返回null if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) { return null; } //如果method方法是在接口中定义的方法,那么获取接口实现类的方法。 Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass); // 尝试获取目标方法上标注的@Transactional注解的属性【因为这个注解可以标注在方法和类上】 TransactionAttribute txAttr = findTransactionAttribute(specificMethod); if (txAttr != null) { return txAttr; } // 获取标注在类上的注解的属性 txAttr = findTransactionAttribute(specificMethod.getDeclaringClass()); if (txAttr != null && ClassUtils.isUserLevelMethod(method)) { return txAttr; } if (specificMethod != method) { // Fallback is to look at the original method. txAttr = findTransactionAttribute(method); if (txAttr != null) { return txAttr; } // Last fallback is the class of the original method. txAttr = findTransactionAttribute(method.getDeclaringClass()); if (txAttr != null && ClassUtils.isUserLevelMethod(method)) { return txAttr; } } return null; }
-
protected TransactionAttribute findTransactionAttribute(Class<?> clazz)
:获取指定类上的@Transactional的属性值protected TransactionAttribute findTransactionAttribute(Method method)
:获取指定方法的上的@Transactional属性的值protected TransactionAttribute determineTransactionAttribute(AnnotatedElement element)
:获取@Transactional属性值的真正调用的方法,执行的逻辑就是使用前面讲过的SpringTransactionAnnotationParser进行注解的解析,代码如下:
protected TransactionAttribute determineTransactionAttribute(AnnotatedElement element) {
//遍历可用的事务注解解析器->this.annotationParsers
for (TransactionAnnotationParser annotationParser : this.annotationParsers) {
//调用解析的方法,进行解析
TransactionAttribute attr = annotationParser.parseTransactionAnnotation(element);
if (attr != null) {
return attr;
}
}
return null;
}
/***org.springframework.transaction.annotation.SpringTransactionAnnotationParser#parseTransactionAnnotation(java.lang.reflect.AnnotatedElement)****/
public TransactionAttribute parseTransactionAnnotation(AnnotatedElement element) {
//直接获取元素上@Transactional注解的属性,如果获取都了,将其解析成TransactionAttribute对象返回即可
AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(
element, Transactional.class, false, false);
if (attributes != null) {
return parseTransactionAnnotation(attributes);
}
//不存在该注解,那么直接返回null
else {
return null;
}
BeanFactoryTransactionAttributeSourceAdvisor【@EnableTransactionManagement导入】
- 事务的增强器,其中封装了拦截器和Advice。
TransactionInterceptor【重要】
事务拦截器,顾名思义,就是在方法执行之前进行一些操作,开启事务就是使用拦截器在调用方法之前开启的。
-
其中几个重要的方法,如下:
-
public Object invoke(MethodInvocation invocation)
:jdk动态代理执行拦截器链的时候会执行的方法,内部真正调用的是父类TransactionAspectSupport
的invokeWithinTransaction方法 -
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation)
:以事务的方式调用,关于事务的周期,比如开启,提交,回滚等等都是从此方法进入的,源码如下:TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
:从此段代码进去是开启事务的逻辑completeTransactionAfterThrowing(txInfo, ex);
:从此段代码进入是出现异常回滚事务的逻辑commitTransactionAfterReturning(txInfo);
:从此段代码进入是提交事务的逻辑
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // If the transaction attribute is null, the method is non-transactional. TransactionAttributeSource tas = getTransactionAttributeSource(); final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); final PlatformTransactionManager tm = determineTransactionManager(txAttr); final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { // Standard transaction demarcation with getTransaction and commit/rollback calls. TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // target invocation exception completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { cleanupTransactionInfo(txInfo); } commitTransactionAfterReturning(txInfo); return retVal; }
-
调用标注@Transactional方法
前提
- 事务默认使用的是JDK的动态代理,并不是cglib代理
- 方法或者类上标注了@Transactional注解
- 配置类标注了@EnableTransactionManagement,开启事务
解析
- 调用标注有@Transactional注解的方法,进入了
org.springframework.aop.framework.JdkDynamicAopProxy#invoke
方法,进行了JDK的动态代理,源码如下:this.advised
:其实就是ProxyFactory(代理工厂),其中在创建代理对象的时候,封装了targetSource和事务增强器(BeanFactoryTransactionAttributeSourceAdvisor)- 其中最重要的一段代码就是
List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
,获取适用于该方法的拦截器链 retVal = invocation.proceed();
:通过拦截器执行该方法,内部实现了事务的一些逻辑。
//proxy 代理对象 method方法,args参数
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
MethodInvocation invocation;
Object oldProxy = null;
boolean setProxyContext = false;
//获取targetSource
TargetSource targetSource = this.advised.targetSource;
Object target = null;
try {
//判断方法是否是equals、hashCode方法
if (!this.equalsDefined && AopUtils.isEqualsMethod(method)) {
// The target does not implement the equals(Object) method itself.
return equals(args[0]);
}
else if (!this.hashCodeDefined && AopUtils.isHashCodeMethod(method)) {
// The target does not implement the hashCode() method itself.
return hashCode();
}
else if (method.getDeclaringClass() == DecoratingProxy.class) {
// There is only getDecoratedClass() declared -> dispatch to proxy config.
return AopProxyUtils.ultimateTargetClass(this.advised);
}
else if (!this.advised.opaque && method.getDeclaringClass().isInterface() &&
method.getDeclaringClass().isAssignableFrom(Advised.class)) {
// Service invocations on ProxyConfig with the proxy config...
return AopUtils.invokeJoinpointUsingReflection(this.advised, method, args);
}
//封装返回值
Object retVal;
if (this.advised.exposeProxy) {
// Make invocation available if necessary.
oldProxy = AopContext.setCurrentProxy(proxy);
setProxyContext = true;
}
//获取目标对象和目标类
target = targetSource.getTarget();
Class<?> targetClass = (target != null ? target.getClass() : null);
//获取拦截器链【重要方法】
List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
//拦截器链为空表示没有使用事务,直接调用即可
if (chain.isEmpty()) {
//获取能够适用的参数
Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
//直接调用方法
retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
}
else {
//拦截器链不为空,需要拦截执行,创建ReflectiveMethodInvocation
invocation = new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);
//通过拦截器链执行
retVal = invocation.proceed();
}
// 判断返回值类型
Class<?> returnType = method.getReturnType();
//如果返回值是对象本身,即是return this,那么返回的必须还是代理对象proxy,否则后续的方法不能使用代理对象了。
if (retVal != null && retVal == target &&
returnType != Object.class && returnType.isInstance(proxy) &&
!RawTargetAccess.class.isAssignableFrom(method.getDeclaringClass())) {
// Special case: it returned "this" and the return type of the method
// is type-compatible. Note that we can't help if the target sets
// a reference to itself in another returned object.
retVal = proxy;
}
else if (retVal == null && returnType != Void.TYPE && returnType.isPrimitive()) {
throw new AopInvocationException(
"Null return value from advice does not match primitive return type for: " + method);
}
return retVal;
}
finally {
if (target != null && !targetSource.isStatic()) {
// Must have come from TargetSource.
targetSource.releaseTarget(target);
}
if (setProxyContext) {
// Restore old proxy.
AopContext.setCurrentProxy(oldProxy);
}
}
}
- 继续上面的获取拦截器链的方法,如下:
- 第一次调用是需要执行
this.advisorChainFactory.getInterceptorsAndDynamicInterceptionAdvice(this, method, targetClass);
方法,后续是直接从缓存中获取即可
- 第一次调用是需要执行
public List<Object> getInterceptorsAndDynamicInterceptionAdvice(Method method, @Nullable Class<?> targetClass) {
//获取key
MethodCacheKey cacheKey = new MethodCacheKey(method);
//获取缓存中的数据
List<Object> cached = this.methodCache.get(cacheKey);
//缓存为空,调用this.advisorChainFactory.getInterceptorsAndDynamicInterceptionAdvice方法
if (cached == null) {
cached = this.advisorChainFactory.getInterceptorsAndDynamicInterceptionAdvice(
this, method, targetClass);
//存入缓存中
this.methodCache.put(cacheKey, cached);
}
return cached;
}
- 进入
org.springframework.aop.framework.DefaultAdvisorChainFactory#getInterceptorsAndDynamicInterceptionAdvice
方法,如下:- 该方法执行成功返回的List就是适用于方法上的拦截器链,事务的拦截器是TransactionIntercept
public List<Object> getInterceptorsAndDynamicInterceptionAdvice(
Advised config, Method method, @Nullable Class<?> targetClass) {
//获取GlobalAdvisorAdapterRegistry
AdvisorAdapterRegistry registry = GlobalAdvisorAdapterRegistry.getInstance();
//获取ProxyFacotry中的增强器
Advisor[] advisors = config.getAdvisors();
//存储可用的全部拦截器
List<Object> interceptorList = new ArrayList<>(advisors.length);
//获取真正的目标类
Class<?> actualClass = (targetClass != null ? targetClass : method.getDeclaringClass());
Boolean hasIntroductions = null;
//遍历增强器,事务的增强器是BeanFactoryTransactionAttributeSourceAdvisor
for (Advisor advisor : advisors) {
//判断是否是PointcutAdvisor类型的,事务的增强器就是这种类型的
if (advisor instanceof PointcutAdvisor) {
// 强转
PointcutAdvisor pointcutAdvisor = (PointcutAdvisor) advisor;
//判断增强器是否之前已经过滤或者此增强器匹配目标类
if (config.isPreFiltered() || pointcutAdvisor.getPointcut().getClassFilter().matches(actualClass)) {
MethodMatcher mm = pointcutAdvisor.getPointcut().getMethodMatcher();
//标记是否匹配
boolean match;
//判断类型
if (mm instanceof IntroductionAwareMethodMatcher) {
if (hasIntroductions == null) {
hasIntroductions = hasMatchingIntroductions(advisors, actualClass);
}
match = ((IntroductionAwareMethodMatcher) mm).matches(method, actualClass, hasIntroductions);
}
else {
//匹配是否适用,其实内部就是判断TransactionAttributeSource中的attributeCache的value是否为null
match = mm.matches(method, actualClass);
}
//匹配了,添加
if (match) {
//判断增强器中的advice是否是MethodInterceptor等类型
MethodInterceptor[] interceptors = registry.getInterceptors(advisor);
if (mm.isRuntime()) {
// Creating a new object instance in the getInterceptors() method
// isn't a problem as we normally cache created chains.
for (MethodInterceptor interceptor : interceptors) {
interceptorList.add(new InterceptorAndDynamicMethodMatcher(interceptor, mm));
}
}
else {
interceptorList.addAll(Arrays.asList(interceptors));
}
}
}
}
//类型是IntroductionAdvisor
else if (advisor instanceof IntroductionAdvisor) {
IntroductionAdvisor ia = (IntroductionAdvisor) advisor;
if (config.isPreFiltered() || ia.getClassFilter().matches(actualClass)) {
Interceptor[] interceptors = registry.getInterceptors(advisor);
interceptorList.addAll(Arrays.asList(interceptors));
}
}
else {
Interceptor[] interceptors = registry.getInterceptors(advisor);
interceptorList.addAll(Arrays.asList(interceptors));
}
}
return interceptorList;
}
- 拦截器链正确返回后,此时代码就执行到了
retVal = invocation.proceed();
,进入,源码如下:- 真正以事务执行的方法是在父类
TransactionAspectSupport
中的invokeWithinTransaction
方法
- 真正以事务执行的方法是在父类
//此方法是循环调用的,每一个拦截器调用都会执行一遍,因此currentInterceptorIndex是表示执行到的拦截器下标
public Object proceed() throws Throwable {
//如果拦截器全部执行完成,那么就开始调用目标方法了
if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
return invokeJoinpoint();
}
//获取当前的拦截器
Object interceptorOrInterceptionAdvice =
this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
//判断拦截器类型是InterceptorAndDynamicMethodMatcher,事务的拦截器显然不是,跳过执行
if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
// Evaluate dynamic method matcher here: static part will already have
// been evaluated and found to match.
InterceptorAndDynamicMethodMatcher dm =
(InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;
Class<?> targetClass = (this.targetClass != null ? this.targetClass : this.method.getDeclaringClass());
if (dm.methodMatcher.matches(this.method, targetClass, this.arguments)) {
return dm.interceptor.invoke(this);
}
else {
// Dynamic matching failed.
// Skip this interceptor and invoke the next in the chain.
return proceed();
}
}
else {
//执行拦截器的invoke方法
return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
}
}
//org.springframework.transaction.interceptor.TransactionInterceptor#invoke
public Object invoke(MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
//执行父类的invokeWithinTransaction方法
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
-
org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction
- final PlatformTransactionManager tm = determineTransactionManager(txAttr):获取事务管理器
- TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification):内部重要的逻辑就是调用doBegin开启事务。
- commitTransactionAfterReturning(txInfo):提交事务,调用是org.springframework.jdbc.datasource.DataSourceTransactionManager#doCommit方法
- completeTransactionAfterThrowing(txInfo, ex):对事务的异常处理,根据@Transactional中的定义的异常,做出不同的处理,内部做了事务的回滚,最终回滚的方法是
org.springframework.jdbc.datasource.DataSourceTransactionManager#doRollback
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
//获取TransactionAttributeSource
TransactionAttributeSource tas = getTransactionAttributeSource();
//获取TransactionAttribute,如果为null,那么是以非事务的方式执行
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
//获取事务管理器,见下方的方法
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
//判断
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
//创建TransactionInfo,此处就会开启事务
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
//再次调用invoke方法,执行下一个拦截器
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
//对事务的异常处理,根据@Transactional中的定义的异常,做出不同的处理,内部做了事务的回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
//重新设置TransactionInfo信息
cleanupTransactionInfo(txInfo);
}
//正确返回之后,提交事务,调用的是org.springframework.jdbc.datasource.DataSourceTransactionManager#doCommit
commitTransactionAfterReturning(txInfo);
//返回结果
return retVal;
}
}
//determineTransactionManager:获取事务管理器,先获取自身的,后获取注入的事务管理器,放入缓存中,下次直接用
protected PlatformTransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {
//如果TransactionAttribute为null,不以事务的形式执行
if (txAttr == null || this.beanFactory == null) {
return getTransactionManager();
}
//获取@Transactional中的qualifier属性的值,根据名称获取事务管理器
String qualifier = txAttr.getQualifier();
if (StringUtils.hasText(qualifier)) {
//根据BeanName获取事务管理器
return determineQualifiedTransactionManager(this.beanFactory, qualifier);
}
//根据transactionManagerBeanName获取事务管理器
else if (StringUtils.hasText(this.transactionManagerBeanName)) {
return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName);
}
else {
//获取默认的事务管理器
PlatformTransactionManager defaultTransactionManager = getTransactionManager();
if (defaultTransactionManager == null) {
//获取缓存中的事务管理器
defaultTransactionManager = this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY);
if (defaultTransactionManager == null) {
//从ioc容器中获取事务管理器
defaultTransactionManager = this.beanFactory.getBean(PlatformTransactionManager.class);
//放入缓存
this.transactionManagerCache.putIfAbsent(
DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager);
}
}
return defaultTransactionManager;
}
}
//createTransactionIfNecessary
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// 如果TransactionAttribute中没有指定name,那么使用joinpointIdentification【方法的全类名+方法名】
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
//TransactionStatus中存储的是事务运行的各个状态
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
//获取事务的当前状态,内部调用最重要的方法是doBegin,开启事务,设置自动提交为false
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
//org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin
//主要的作用是开启事务con.setAutoCommit(false);
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
//获取数据库连接
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
//如果是自动提交的,那么设置不是自动提交
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
//准备工作,如果标记了readOnly=true,那么设置事务事只读的。
prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
//判断设置的超时时间
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
//org.springframework.jdbc.datasource.DataSourceTransactionManager#doCommit
//提交事务
protected void doCommit(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Committing JDBC transaction on Connection [" + con + "]");
}
try {
//提交事务
con.commit();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not commit JDBC transaction", ex);
}
}
总结
- 事务执行流程:
- 标注@EnableTransactionManagement,内部使用@Import注解导入了几个Bean,分别是
InfrastructureAdvisorAutoProxyCreator
、BeanFactoryTransactionAttributeSourceAdvisor
、AnnotationTransactionAttributeSource
、TransactionInterceptor
- 对类上或者方法上标注了@Transactional注解的类生成代理对象,调用AbstractAutoProxyCreator中的
wrapIfNecessary
生成- 重要的逻辑就是使用SpringAnnotationTransactionParser解析每一个类每一个方法上标注的@Transactional注解的属性值,封装在TransactionalAttribute中,最终将这些TransactionalAttribut统一封装在TransactionalAttributeSource中【使用
attributeCache
进行缓存】 - 最终将增强器Advisor和拦截器封装在ProxyFactory中
- 重要的逻辑就是使用SpringAnnotationTransactionParser解析每一个类每一个方法上标注的@Transactional注解的属性值,封装在TransactionalAttribute中,最终将这些TransactionalAttribut统一封装在TransactionalAttributeSource中【使用
- 执行的时候先调用
org.springframework.aop.framework.JdkDynamicAopProxy#invoke
方法- 先获取适用的拦截器链,最终以循环调用的方式(类似)执行拦截器
- 调用TransactionInterceptor中的invoke方法,最终执行的
TransactionAspect
中的invokeWithTransactional方法,在这方法内部涉及了事务的生命周期,如开启事务,回滚事务,提交事务等等操作,都是从此方法进入的。最终执行的处理是调用事务管理器中的doXXX
方法,比如doCommit
方法。
- 标注@EnableTransactionManagement,内部使用@Import注解导入了几个Bean,分别是