什么是事务?
事务(Transaction):指访问并可能更新数据库中各种数据项的一个程序执行单元(unit),它是恢复和并发控制的基本单位。
通常我们在业务逻辑处理代码中对数据库的同一组增删改查操作就是一个事务操作,由此可见事务的前提必须是使用的数据库的底层支持,但是并不是所有的数据库(准确来说是存储引擎,因为数据库持久化的底层是对磁盘读写的操作)都支持事务操作。
因此Spring中支持的事务其实是对数据库事务的一种代理封装,具体的事务操作必须依赖所使用的数据库提供的开放API;以常见的数据库MySQL为例,Spring使用MySQL事务的前提需要导入mysql-connector-java依赖包,它就是MySQL用Java语言开发的客户端应用程序,它提供了数据库的API及许多增值扩展。
那这意味着什么呢?
这意味着你完全可以基于它来自定义封装一个事务框架,甚至是分布式事务框架,万丈高楼平地起,发展空间很大哦 _。
准备工作
既然已经知道了Spring事务的由来,那我们还需要了解Spring中事务的一些基础知识。
特性
事务应该具有的4个属性,我们称为ACID特性:
- 原子性(Automicity):一个事务操作中不可分割的最小工作单位,要么都成功,要么都失败。
- 一致性(Consistency):事务前后的数据必须保证完整一致性。
- 隔离性(Isolation):各个事务之间的执行操作是相互隔离且互不影响的。
- 持久性(Durability):一个事务一旦提交,对数据库中数据的改变是永久性的,不会被回滚。
并发问题
通常执行程序在多核CPU中会发生并发执行,因此带来了多线程的并发问题,而事务在并发情况下同样也会带来读一致性的问题,像以下3种:
- 脏读:在一个事务中执行insert、update、delete操作,但是还未提交,另外一个事务读取了未提交的数据,一旦前面事务发生回滚,那么后面的事务就产生了脏读。
- 不可重复读:在一个事务中发生的两次读操作中间,另一个事务对数据进行了update或delete操作,导致两次读取数据结果不一致的问题。
- 幻读:在一个事务执行select、insert、update、delete操作时,另外一个事务进行了insert操作,这时前面的事务会无法操作新增的数据,从而产生幻读问题。
隔离级别
上面描述了并发环境下事务会产生的问题,再结合前面介绍的特性,能知道通过隔离性能很好地解决这些问题,而在数据库中支持4种隔离级别:
未提交读(Read-Uncommitted):未提交的事务操作数据允许被其他事务读取到,它能导致脏读、不可重复读、幻读的问题。
已提交读(Read-Committed):已提交的事务操作数据才允许被其他事务读取到,它能导致不可重复读、幻读的问题。
可重复读(Repeatable-Read):它在已提交读的基础上解决了不可重复读的问题,但是还是会产生幻读的问题。
串行化(Serializable):所有事务必须串行化一个一个执行,不能并发执行,避免了脏读、不可重复读、幻读的问题,但是严重影响执行效率。
Spring中默认配置隔离级别是使用的数据库的默认级别,而MySQL中默认的隔离级别为可重复读,同时默认的隔离级别在MySQL5.6版本开始的InnoDB存储引擎中也已经通过临键间隙锁(Next-Key锁)解决幻读的问题;而上面的隔离级别从上往下越能保证数据的完整性和一致性,但是并发性能却越来越低。
传播行为
Spring的传播行为,就是在实际业务代码开发中,同时存在多个事务时,Spring应该怎样去处理这些事务的行为。这些传播属性定义在类TransactionDefinition中,分别是以下7种:
- PROPAGATION_REQUIRED:如果当前存在事务,则加入该事务;如果当前没有事务,则创建一个新的事务。
- PROPAGATION_SUPPORTS:如果当前存在事务,则加入该事务;如果当前没有事务,则以非事务的方式继续运行。
- PROPAGATION_MANDATORY:如果当前存在事务,则加入该事务;如果当前没有事务,则抛出异常。
- PROPAGATION_REQUIRES_NEW:创建一个新的事务,如果当前存在事务,则把当前事务挂起。
- PROPAGATION_NOT_SUPPORTED:以非事务方式运行,如果当前存在事务,则把当前事务挂起。
- PROPAGATION_NEVER: 以非事务方式运行,如果当前存在事务,则抛出异常。
- PROPAGATION_NESTED:如果当前存在事务,则创建一个事务作为当前事务的嵌套事务来运行;如果当前没有事务,则该取值等价于TransactionDefinition.PROPAGATION_REQUIRED。
而Spring中默认的传播行为为PROPAGATION_REQUIRED,实际开发中,我们使用@Transactional来启用注解,其中的propagation属性可以进行事务传播行为的配置,而属性值就是Propagation枚举类来定义的。
声明式配置
Spring中对于数据库事务支持声明式事务和编程式事务,不管是实际开发还是Spring官方都推荐使用声明式事务,因为它对应用程序代码的侵入性最小;它有两种配置方式:
- 基于XML的配置:在XML中通过
<tx:advice/>
标签指定的各种事务设置。 - 基于注解的配置:在代码中使用@Transactional注解配置,这也是实际开发中最常用的方式。
这两种方式默认设置如下:
- 传播行为是
REQUIRED
; - 隔离级别为
DEFAULT
; - 事务是读写的;
- 事务超时默认为底层事务系统的默认超时,如果不支持超时,则无;
- 任何
RuntimeException
触发器都会回滚,任何检查Exception
都不会。
事务的流程
通常我们在开发中对数据库的执行操作的是通过集成第三方ORM框架完成的,像MyBatis、JPA、Hibernate等,而非Spring自带的封装实现;但是通常为了方便使用,数据源及事务管理等还是会交给Spring去统一管理。
这里我们基于Mysql数据库、注解式事务提供一个简单的Demo来作为入口去解析源码中事务的执行流程。
@Configuration
@EnableTransactionManagement
public class Config {
@Bean
public DataSource setDataSource(){
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName("com.mysql.jdbc.Driver");
dataSource.setUrl("jdbc:mysql://localhost:3306/test");
dataSource.setUsername("root");
dataSource.setPassword("123456");
return dataSource;
}
@Bean
public PlatformTransactionManager setPlatformTransactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
@Bean
public JdbcTemplate setJdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
}
@Service
public class UserService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Transactional(rollbackFor = Exception.class)
public void handle(){
jdbcTemplate.update("insert into user(user_name,phone) values(?,?)","张三","13212312341");
update();
}
@Transactional(rollbackFor = Exception.class)
public void update(){
jdbcTemplate.update("update user set phone = ? where id = ?","11122223333",2);
}
}
public class Test {
public static void main(String[] args) {
AnnotationConfigApplicationContext ac = new AnnotationConfigApplicationContext("com.test.spring.transaction");
UserService userService = ac.getBean(UserService.class);
userService.handle();
}
}
在解析事务的处理过程前,我们需要提前做好两步工作:
- 配置数据源
DataSource
; - 定义事务管理器
PlatformTransactionManager
;
事务管理器
我们知道所有对数据库的操作都是基于API来实现的,所以得先通过配置数据源来连接数据库,那么第二步的事务管理器的作用是什么?
首先事务管理器是Spring提供的一种事务的抽象编程模型,它可以跨不同事务API,同时提供了声明示事务的支持;我们可以理解为它就是一个顶层的事务管理接口,我们可以基于它的实现很好地集成到Spring框架中去使用事务。
而Spring中的事务管理类型分为两种:
- 命令式:PlatformTransactionManager,与
@Transactional
配合使用,管理同一个线程绑定的一些列事务操作。 - 反应式:ReactiveTransactionManager,提供Reactive反应式编程中事务管理,使用 Reactor 上下文执行事务操作。
通常我们使用的就是PlatformTransactionManager,我们先看下它的源码,
public interface PlatformTransactionManager {
//获取事务执行和查询状态
TransactionStatus getTransaction(@Nullable TransactionDefinition var1) throws TransactionException;
//事务提交
void commit(TransactionStatus var1) throws TransactionException;
//事务回滚
void rollback(TransactionStatus var1) throws TransactionException;
}
它是一个顶层接口,定义了几个模板方法,将开发中业务逻辑代码和数据库事务相关执行代码进行了拆分,降低了代码之间的耦合性,提高了复用性。
我们通过它的类图能看到已经提供了很多子类实现,它们各自的作用这里不一一介绍了,感兴趣的可以去了解下。
启用流程
在解析了一些相关Spring源码过程中,相信已经发现,基本任何有关Spring的源码解析都绕不开Spring的IOC容器初始化过程。
这里同样也是,我们先在如下图位置进行断点debug,
发现这里的UserService对象实际是一个通过CGLIB代理的对象,首先可以想到这里的UserService因为没有实现接口,所以是通过CGLIB,而非使用JDK进行的动态代理;但是我们的Demo中好像并没有启用AOP及使用任何切点、切面的配置,只在方法上加了@Transactional注解,为什么这里从IOC容器中获取的是一个代理对象呢?难道@Transactional注解其实就是一个切点?
没错,Spring中事务的前提就是基于AOP的支持来实现的,那看样子我们需要从IOC及AOP的初始化过程中进行探究了,而之前我们已经解析了它们各自的初始化过程,相信从中找出有关事务相关代理流程应该不是问题。
Demo中我们在Config配置中加了@EnableTransactionManagement注解,它是用来启用事务管理器,看它的源码,
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({TransactionManagementConfigurationSelector.class})
public @interface EnableTransactionManagement {
//强制使用CGLIB代理
boolean proxyTargetClass() default false;
//通知类型:代理、AspectJ
AdviceMode mode() default AdviceMode.PROXY;
int order() default 2147483647;
}
是不是发现和@EnableAspectJAutoProxy有点类似,这里的属性值分别为:
- proxyTargetClass:默认false,Spring会根据被代理对象的实现接口情况去自动选择JDK或者CGLIB动态代理机制;设置为true时,代表代理机制强制使用CGLIB动态代理,但这样会导致无法对final修饰的方法进行拦截通知,因为它不能被覆写。
- AdviceMode:支持PROXY和AspectJ两种类型,默认为PROXY。
- order:加载顺序。
而这里通过@Import的TransactionManagementConfigurationSelector类的源码,
public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {
public TransactionManagementConfigurationSelector() {
}
protected String[] selectImports(AdviceMode adviceMode) {
switch(adviceMode) {
case PROXY:
return new String[]{AutoProxyRegistrar.class.getName(), ProxyTransactionManagementConfiguration.class.getName()};
case ASPECTJ:
return new String[]{"org.springframework.transaction.aspectj.AspectJTransactionManagementConfiguration"};
default:
return null;
}
}
}
它的顶层父类是ImportSelector,而ImportSelector的作用是:在添加@Configuration
注解的Class上使用@Import
去引入了一个ImportSelector的实现子类后,会把实现类中selectImports()方法返回的ClassName都定义为Bean对象。
所以默认AdviceMode=PROXY时,它会根据根据AdviceMode类型选择加载注册AutoProxyRegistrar、ProxyTransactionManagementConfiguration对象。
启用代理
我们先看下AutoProxyRegistrar的类图,[图片上传失败...(image-a9e0ac-1632446770718)]
发现它和AspectJAutoProxyRegistrar一样都是ImportBeanDefinitionRegistrar的实现子类,而在AOP的源码解析中我们知道AOP自动代理的启用就是通过AspectJAutoProxyRegistrar来注册AnnotationAwareAspectJAutoProxyCreator完成的,而这里通过AutoProxyRegistrar注册InfrastructureAdvisorAutoProxyCreator来完成的,通过下面的类图我们就知道了,它们都是BeanPostProcessor的实现子类。
[图片上传失败...(image-14b47a-1632446770718)]
事务管理配置定义
在看下ProxyTransactionManagementConfiguration的源码,
@Configuration
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {
public ProxyTransactionManagementConfiguration() {
}
@Bean(name = {"org.springframework.transaction.config.internalTransactionAdvisor"})
@Role(2)
public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor() {
BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
advisor.setTransactionAttributeSource(this.transactionAttributeSource());
advisor.setAdvice(this.transactionInterceptor());
if (this.enableTx != null) {
advisor.setOrder((Integer)this.enableTx.getNumber("order"));
}
return advisor;
}
@Bean
@Role(2)
public TransactionAttributeSource transactionAttributeSource() {
return new AnnotationTransactionAttributeSource();
}
@Bean
@Role(2)
public TransactionInterceptor transactionInterceptor() {
TransactionInterceptor interceptor = new TransactionInterceptor();
interceptor.setTransactionAttributeSource(this.transactionAttributeSource());
if (this.txManager != null) {
interceptor.setTransactionManager(this.txManager);
}
return interceptor;
}
}
// 父类
@Configuration
public abstract class AbstractTransactionManagementConfiguration implements ImportAware {
@Nullable
protected AnnotationAttributes enableTx;
@Nullable
protected PlatformTransactionManager txManager;
public AbstractTransactionManagementConfiguration() {
}
public void setImportMetadata(AnnotationMetadata importMetadata) {
this.enableTx = AnnotationAttributes.fromMap(importMetadata.getAnnotationAttributes(EnableTransactionManagement.class.getName(), false));
if (this.enableTx == null) {
throw new IllegalArgumentException("@EnableTransactionManagement is not present on importing class " + importMetadata.getClassName());
}
}
@Autowired(required = false)
void setConfigurers(Collection<TransactionManagementConfigurer> configurers) {
if (!CollectionUtils.isEmpty(configurers)) {
if (configurers.size() > 1) {
throw new IllegalStateException("Only one TransactionManagementConfigurer may exist");
} else {
TransactionManagementConfigurer configurer = (TransactionManagementConfigurer)configurers.iterator().next();
this.txManager = configurer.annotationDrivenTransactionManager();
}
}
}
@Bean(name = {"org.springframework.transaction.config.internalTransactionalEventListenerFactory"})
@Role(2)
public TransactionalEventListenerFactory transactionalEventListenerFactory() {
return new TransactionalEventListenerFactory();
}
}
一共定义了以下4个Bean对象:
- TransactionalEventListenerFactory:创建事务事件监听器的工厂类。
- TransactionInterceptor:事务处理的拦截器。
- AnnotationTransactionAttributeSource:用于处理事务元数据的接口。
- BeanFactoryTransactionAttributeSourceAdvisor:事务中事务方法的通知。
现在可能难以理解它们实际的作用,但是继续往下解析,你就会知道它们在后面代理类的生成以及代理方法执行过程中锁发挥的作用。
生成代理类
再结合之前解析的IOC及AOP的初始化流程,代理类的生成流程大致如下:
- 通过InfrastructureAdvisorAutoProxyCreator对Bean进行前后置处理(Demo中UserService将@Transactionl注解添加在方法上,会在Bean初始化后调用applyBeanPostProcessorsAfterInitialization()方法进行回调处理);
- 获取BeanFactoryTransactionAttributeSourceAdvisor通知对Bean的类及方法上的解析@Transactional注解进行匹配;
- 从BeanFactoryTransactionAttributeSourceAdvisor中获取AnnotationTransactionAttributeSource对@Transactional中设置的属性进行解析处理;
- 讲解析后的属性缓存到RuleBasedTransactionAttribute中,像事务的传播行为、隔离机制、回滚规则等。
- 对满足通知切点规则的Bean进行动态代理,生成代理类。
[图片上传失败...(image-4f1792-1632446770718)]
处理流程
这里我们对数据库的访问操作使用的是Spring自己的模板封装类JdbcTemplate,当然除了它还有其他模板类,像NamedParameterJdbcTemplate
、SimpleJdbcInsert
、SimpleJdbcCall
、RDBMS 对象(包括MappingSqlQuery
、SqlUpdate
和StoredProcedure
),根据自己的选择需求选择即可。
由于我们Demo中UserService使用的是CGLIB动态代理,所以在执行handle()方法的时候,实际调用的是CglibAopProxy类中的intercept()方法,
@Nullable
public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
Object oldProxy = null;
boolean setProxyContext = false;
Object target = null;
TargetSource targetSource = this.advised.getTargetSource();
Object var16;
try {
if (this.advised.exposeProxy) {
oldProxy = AopContext.setCurrentProxy(proxy);
setProxyContext = true;
}
target = targetSource.getTarget();
Class<?> targetClass = target != null ? target.getClass() : null;
//获取拦截器链
List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
Object retVal;
if (chain.isEmpty() && Modifier.isPublic(method.getModifiers())) {
Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
retVal = methodProxy.invoke(target, argsToUse);
} else {
//反射调用
retVal = (new CglibAopProxy.CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy)).proceed();
}
retVal = CglibAopProxy.processReturnType(proxy, target, method, retVal);
var16 = retVal;
} finally {
if (target != null && !targetSource.isStatic()) {
targetSource.releaseTarget(target);
}
if (setProxyContext) {
AopContext.setCurrentProxy(oldProxy);
}
}
return var16;
}
上面代码是AOP中CGLIB动态代理的代理执行方法源码,这里不在详细讲解,我们主要关注getInterceptorsAndDynamicInterceptionAdvice()方法以及proceed()方法中的实现。
获取拦截器链
这里调用getInterceptorsAndDynamicInterceptionAdvice()方法就是为了获取通知中的拦截器链,
public List<Object> getInterceptorsAndDynamicInterceptionAdvice(Method method, @Nullable Class<?> targetClass) {
// 生成缓存的key
AdvisedSupport.MethodCacheKey cacheKey = new AdvisedSupport.MethodCacheKey(method);
List<Object> cached = (List)this.methodCache.get(cacheKey);
if (cached == null) {
cached = this.advisorChainFactory.getInterceptorsAndDynamicInterceptionAdvice(this, method, targetClass);
this.methodCache.put(cacheKey, cached);
}
return cached;
}
这里主要为了对通知中获取的拦截器集合进行缓存,而获取的逻辑为交给getInterceptorsAndDynamicInterceptionAdvice()方法来实现的,
public List<Object> getInterceptorsAndDynamicInterceptionAdvice(Advised config, Method method, @Nullable Class<?> targetClass) {
List<Object> interceptorList = new ArrayList(config.getAdvisors().length);
Class<?> actualClass = targetClass != null ? targetClass : method.getDeclaringClass();
boolean hasIntroductions = hasMatchingIntroductions(config, actualClass);
AdvisorAdapterRegistry registry = GlobalAdvisorAdapterRegistry.getInstance();
//获取通知的集合
Advisor[] var8 = config.getAdvisors();
int var9 = var8.length;
for(int var10 = 0; var10 < var9; ++var10) {
Advisor advisor = var8[var10];
MethodInterceptor[] interceptors;
if (advisor instanceof PointcutAdvisor) {
PointcutAdvisor pointcutAdvisor = (PointcutAdvisor)advisor;
if (config.isPreFiltered() || pointcutAdvisor.getPointcut().getClassFilter().matches(actualClass)) {
interceptors = registry.getInterceptors(advisor);
MethodMatcher mm = pointcutAdvisor.getPointcut().getMethodMatcher();
if (MethodMatchers.matches(mm, method, actualClass, hasIntroductions)) {
if (mm.isRuntime()) {
MethodInterceptor[] var15 = interceptors;
int var16 = interceptors.length;
for(int var17 = 0; var17 < var16; ++var17) {
MethodInterceptor interceptor = var15[var17];
interceptorList.add(new InterceptorAndDynamicMethodMatcher(interceptor, mm));
}
} else {
interceptorList.addAll(Arrays.asList(interceptors));
}
}
}
} else if (advisor instanceof IntroductionAdvisor) {
IntroductionAdvisor ia = (IntroductionAdvisor)advisor;
if (config.isPreFiltered() || ia.getClassFilter().matches(actualClass)) {
interceptors = registry.getInterceptors(advisor);
interceptorList.addAll(Arrays.asList(interceptors));
}
} else {
Interceptor[] interceptors = registry.getInterceptors(advisor);
interceptorList.addAll(Arrays.asList(interceptors));
}
}
return interceptorList;
}
这段代码主要根据通知的类型,获取通知Advisor中的拦截器,这里获取的advisor就是前面事务管理配置定义的BeanFactoryTransactionAttributeSourceAdvisor
,而里面的拦截器就是TransactionInterceptor
,而我们也知道TransactionInterceptor
中还保存着AnnotationTransactionAttributeSource
和TransactionManager
。
执行代理方法
获取到拦截器链后,接下来我们看proceed()方法的源码,
@Nullable
public Object proceed() throws Throwable {
//执行完Interceptor则执行joinPoint
if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
return this.invokeJoinpoint();
} else {
Object interceptorOrInterceptionAdvice = this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
//要动态匹配joinPoint
if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
InterceptorAndDynamicMethodMatcher dm = (InterceptorAndDynamicMethodMatcher)interceptorOrInterceptionAdvice;
//匹配运行时参数
return dm.methodMatcher.matches(this.method, this.targetClass, this.arguments) ? dm.interceptor.invoke(this) : this.proceed();
} else {
//执行当前Intercetpor
return ((MethodInterceptor)interceptorOrInterceptionAdvice).invoke(this);
}
}
}
这里逻辑主要就是先执行完所有拦截器中的invoke()方法,最后执行invokeJoinpoint()切点方法;我们已经知道这里获取的拦截器是TransactionInterceptor
,那我们来看看它invoke()的源码,
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
Class<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null;
Method var10001 = invocation.getMethod();
invocation.getClass();
return this.invokeWithinTransaction(var10001, targetClass, invocation::proceed);
}
发现真正的执行逻辑是在invokeWithinTransaction()方法中实现的,
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, TransactionAspectSupport.InvocationCallback invocation) throws Throwable {
//获取事务元数据处理类
TransactionAttributeSource tas = this.getTransactionAttributeSource();
//获取事务属性
TransactionAttribute txAttr = tas != null ? tas.getTransactionAttribute(method, targetClass) : null;
//获取事务管理器
PlatformTransactionManager tm = this.determineTransactionManager(txAttr);
//获取切点方法
String joinpointIdentification = this.methodIdentification(method, targetClass, txAttr);
Object result;
//编程式事务,TransactionManager提供的扩展回调接口
if (txAttr != null && tm instanceof CallbackPreferringPlatformTransactionManager) {
TransactionAspectSupport.ThrowableHolder throwableHolder = new TransactionAspectSupport.ThrowableHolder();
try {
result = ((CallbackPreferringPlatformTransactionManager)tm).execute(txAttr, (status) -> {
TransactionAspectSupport.TransactionInfo txInfo = this.prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
Object var9;
try {
Object var8 = invocation.proceedWithInvocation();
return var8;
} catch (Throwable var13) {
if (txAttr.rollbackOn(var13)) {
if (var13 instanceof RuntimeException) {
throw (RuntimeException)var13;
}
throw new TransactionAspectSupport.ThrowableHolderException(var13);
}
throwableHolder.throwable = var13;
var9 = null;
} finally {
this.cleanupTransactionInfo(txInfo);
}
return var9;
});
if (throwableHolder.throwable != null) {
throw throwableHolder.throwable;
} else {
return result;
}
} catch (TransactionAspectSupport.ThrowableHolderException var19) {
throw var19.getCause();
} catch (TransactionSystemException var20) {
if (throwableHolder.throwable != null) {
this.logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
var20.initApplicationException(throwableHolder.throwable);
}
throw var20;
} catch (Throwable var21) {
if (throwableHolder.throwable != null) {
this.logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
}
throw var21;
}
} else {
//声明式事务
//创建事务
TransactionAspectSupport.TransactionInfo txInfo = this.createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
result = null;
try {
result = invocation.proceedWithInvocation();
} catch (Throwable var17) {
//异常后事务处理
this.completeTransactionAfterThrowing(txInfo, var17);
throw var17;
} finally {
//清除事务信息
this.cleanupTransactionInfo(txInfo);
}
//成功返回后事务提交
this.commitTransactionAfterReturning(txInfo);
return result;
}
}
能看到在这个方法做了比较多的逻辑实现,而Spring中对事务的处理逻辑主要流程也是在这里得以实现的,上面这段代码实现的逻辑大致如下图
[图片上传失败...(image-959d0c-1632446770718)]
1.准备工作
首先获取事务元数据处理类AnnotationTransactionAttributeSource
、事务属性RuleBasedTransactionAttribute
、事务管理器DataSourceTransactionManager
,切点入口joinpointIdentification
,为后面的事务处理提前做准备。
2.处理策略
然后根据TransactionManager类型选择不同的处理,其中CallbackPreferringPlatformTransactionManager是公开用于在事务中执行给定回调的方法的扩展接口,也就是编程式事务,这里我们使用的是声明式事务,主要还是关注else后的TransactionManager的处理逻辑。
3.创建事务
接下来会调用createTransactionIfNecessary()方法来创建事务TransactionInfo
,
protected TransactionAspectSupport.TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
//封装DelegatingTransactionAttribute
if (txAttr != null && ((TransactionAttribute)txAttr).getName() == null) {
txAttr = new DelegatingTransactionAttribute((TransactionAttribute)txAttr) {
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
//获取事务状态
status = tm.getTransaction((TransactionDefinition)txAttr);
} else if (this.logger.isDebugEnabled()) {
this.logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured");
}
}
//创建TransactionInfo
return this.prepareTransactionInfo(tm, (TransactionAttribute)txAttr, joinpointIdentification, status);
}
首先会将txAttr封装成DelegatingTransactionAttribute,以便后续提供更多的功能;然后调用getTransaction()来创建事务并返回事务状态;最后调用prepareTransactionInfo()来创建TransactionInfo,并设置事务相关信息。
我们先看下getTransaction()的源码,
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
//创建事务对象DataSourceTransactionObject
Object transaction = this.doGetTransaction();
boolean debugEnabled = this.logger.isDebugEnabled();
if (definition == null) {
definition = new DefaultTransactionDefinition();
}
//事务的相关检查
if (this.isExistingTransaction(transaction)) {//嵌套事务处理
return this.handleExistingTransaction((TransactionDefinition)definition, transaction, debugEnabled);
} else if (((TransactionDefinition)definition).getTimeout() < -1) {//超时时间校验
throw new InvalidTimeoutException("Invalid transaction timeout", ((TransactionDefinition)definition).getTimeout());
} else if (((TransactionDefinition)definition).getPropagationBehavior() == 2) {//事务传播行为判断
throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");
} else if (((TransactionDefinition)definition).getPropagationBehavior() != 0 && ((TransactionDefinition)definition).getPropagationBehavior() != 3 && ((TransactionDefinition)definition).getPropagationBehavior() != 6) {//事务传播行为判断
if (((TransactionDefinition)definition).getIsolationLevel() != -1 && this.logger.isWarnEnabled()) {
this.logger.warn("Custom isolation level specified but no actual transaction initiated; isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = this.getTransactionSynchronization() == 0;
//创建事务状态TransactionStatus
return this.prepareTransactionStatus((TransactionDefinition)definition, (Object)null, true, newSynchronization, debugEnabled, (Object)null);
} else {
//空挂起
AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources = this.suspend((Object)null);
if (debugEnabled) {
this.logger.debug("Creating new transaction with name [" + ((TransactionDefinition)definition).getName() + "]: " + definition);
}
try {
boolean newSynchronization = this.getTransactionSynchronization() != 2;
//创建事务状态TransactionStatus
DefaultTransactionStatus status = this.newTransactionStatus((TransactionDefinition)definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//创建事务
this.doBegin(transaction, (TransactionDefinition)definition);
//记录事务同步设置
this.prepareSynchronization(status, (TransactionDefinition)definition);
return status;
} catch (Error | RuntimeException var7) {
//恢复挂起的事务
this.resume((Object)null, suspendedResources);
throw var7;
}
}
}
能看到这里对事务的创建做了很多的逻辑处理,分析后大致的流程如下:
- 创建事务对象DataSourceTransactionObject,事务保存点的设置;
- 对事务进行相关判断校验处理,包括嵌套事务处理、超时时间校验、事务传播行为属性的判断处理等
- 构建事务状态TransactionStatus
- 创建事务
- 事务同步设置
- 如果当前有事务挂起,执行完则将挂起的事务恢复
这里有两个地方需要特别关注,一个是不同传播行为属性的相关处理策略,另一个是doBegin()创建事务;下面我们会分析下doBegin()方法的逻辑实现,而行为属性的相关处理策略源码感兴趣可以自己了解下(其实也包含调用doBegin()去创建事务),
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
Connection con = null;
try {
if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
//建立数据库连接
Connection newCon = this.obtainDataSource().getConnection();
if (this.logger.isDebugEnabled()) {
this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
//绑定连接
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
//设置事务隔离级别
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
//将事务自动提交设置交给Spring管理
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (this.logger.isDebugEnabled()) {
this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
//设置只读
this.prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = this.determineTimeout(definition);
if (timeout != -1) {
//超时设置
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
if (txObject.isNewConnectionHolder()) {
//绑定连接资源到当前线程
TransactionSynchronizationManager.bindResource(this.obtainDataSource(), txObject.getConnectionHolder());
}
} catch (Throwable var7) {
if (txObject.isNewConnectionHolder()) {
//释放连接
DataSourceUtils.releaseConnection(con, this.obtainDataSource());
txObject.setConnectionHolder((ConnectionHolder)null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", var7);
}
}
能看到调用底层数据库API来创建连接并设置事务属性都在这里进行了处理,像事务隔离级别、连接超时设置、自动提交、是否只读等设置,最后再将连接资源绑定到当前线程的事务同步管理器中进行维护管理,而同步管理器对这些事务相关信息的存放都是通过ThreadLocal
进行保存的。
[图片上传失败...(image-34a7af-1632446770718)]
4.执行代理方法
创建事务完成后,接下来就会执行代理方法,也就是Demo中handle()方法,通常这也就是我们业务层需要执行的sql方法,将业务数据增删改查到数据库中去。
5.异常事务处理
如果代理方法执行过程中发生了异常,这里会调用completeTransactionAfterThrowing()方法来进行判断处理,
protected void completeTransactionAfterThrowing(@Nullable TransactionAspectSupport.TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (this.logger.isTraceEnabled()) {
this.logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "] after exception: " + ex);
}
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
//异常回滚
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
} catch (TransactionSystemException var6) {
this.logger.error("Application exception overridden by rollback exception", ex);
var6.initApplicationException(ex);
throw var6;
} catch (Error | RuntimeException var7) {
this.logger.error("Application exception overridden by rollback exception", ex);
throw var7;
}
} else {
try {
//事务提交
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
} catch (TransactionSystemException var4) {
this.logger.error("Application exception overridden by commit exception", ex);
var4.initApplicationException(ex);
throw var4;
} catch (Error | RuntimeException var5) {
this.logger.error("Application exception overridden by commit exception", ex);
throw var5;
}
}
}
}
上面主要就是为了判断发生的异常是否需要回滚,这里默认是通过rollbackOn()判断捕获的异常是否是RuntimeException
或Error
类型,如果我们在配置@Transactional注解的时候设置了触发回滚的异常类型,就是通过递归判断捕获的异常或父类是否是配置的异常类型,如果是就执行rollback()方法进行回滚,不是则调用commit()方法对事务正常提交。
我们来看下rollback()方法的源码是怎么进行回滚操作的,
public final void rollback(TransactionStatus status) throws TransactionException {
//事务已经完成则会抛出异常
if (status.isCompleted()) {
throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");
} else {
DefaultTransactionStatus defStatus = (DefaultTransactionStatus)status;
this.processRollback(defStatus, false);
}
}
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
try {
//前置回调
this.triggerBeforeCompletion(status);
if (status.hasSavepoint()) {
if (status.isDebug()) {
this.logger.debug("Rolling back transaction to savepoint");
}
//有保存点则回退到保存点
status.rollbackToHeldSavepoint();
} else if (status.isNewTransaction()) {
if (status.isDebug()) {
this.logger.debug("Initiating transaction rollback");
}
//独立事物则会滚
this.doRollback(status);
} else {
if (status.hasTransaction()) {
if (!status.isLocalRollbackOnly() && !this.isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
this.logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
} else {
if (status.isDebug()) {
this.logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
//非独立事务,则进行标记,执行完后统一进行回滚
this.doSetRollbackOnly(status);
}
} else {
this.logger.debug("Should roll back transaction but cannot - no transaction available");
}
if (!this.isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
} catch (Error | RuntimeException var8) {
//后置回调
this.triggerAfterCompletion(status, 2);
throw var8;
}
//后置回调
this.triggerAfterCompletion(status, 1);
if (unexpectedRollback) {
throw new UnexpectedRollbackException("Transaction rolled back because it has been marked as rollback-only");
}
} finally {
//清除事务相关资源信息
this.cleanupAfterCompletion(status);
}
}
通过上面的代码我们能看到,对于异常的回滚并不是直接调用底层数据库API进行回滚,而是做了很多逻辑判断处理,处理逻辑如下:
- 触发TransactionSynchronization的前置回调
- 对于嵌套事务中保存点的回滚处理,其中内部事务的异常并不会影响外部事务的回滚
- 对于独立的事务则会直接调用底层数据库API进行回滚
- 对于以上两种情况之外的事务,则会先进行setRollbackOnly标识,等事务执行完后统一进行回滚
- 触发TransactionSynchronization的后置回调
- 回滚后清除事务相关资源信息,像事务同步管理器中的设置、数据库连接资源的释放重置以及与当前线程的解绑等,并恢复挂起的事务
6.清除事务信息
上述执行完成后会调用cleanupTransactionInfo()方法来通过恢复线程本地状态的方式清除当前事务信息。
7.返回后事务提交
如果代理方法执行过程中没有任何异常发生,则会调用commitTransactionAfterReturning()方法走正常的事务提交,我们来看看提交的逻辑处理,
protected void commitTransactionAfterReturning(@Nullable TransactionAspectSupport.TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (this.logger.isTraceEnabled()) {
this.logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
public final void commit(TransactionStatus status) throws TransactionException {
//事务已经完成则会抛出异常
if (status.isCompleted()) {
throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");
} else {
DefaultTransactionStatus defStatus = (DefaultTransactionStatus)status;
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
this.logger.debug("Transactional code has requested rollback");
}
//回滚
this.processRollback(defStatus, false);
} else if (!this.shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
this.logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
//回滚
this.processRollback(defStatus, true);
} else {
//提交
this.processCommit(defStatus);
}
}
}
这里先对事务状态的RollbackOnly回滚标识进行判断处理,如果标记了则会调用processRollback()进行回滚操作,具体的回滚处理逻辑前面也已经解析过了,我们主要来看看processCommit()方法对于正常提交的逻辑处理,
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
//扩展空接口
this.prepareForCommit(status);
//前置回调
this.triggerBeforeCommit(status);
this.triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
if (status.hasSavepoint()) {
if (status.isDebug()) {
this.logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
//清除保存点
status.releaseHeldSavepoint();
} else if (status.isNewTransaction()) {
if (status.isDebug()) {
this.logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
//独立事务则直接提交
this.doCommit(status);
} else if (this.isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
if (unexpectedRollback) {
throw new UnexpectedRollbackException("Transaction silently rolled back because it has been marked as rollback-only");
}
} catch (UnexpectedRollbackException var17) {
this.triggerAfterCompletion(status, 1);
throw var17;
} catch (TransactionException var18) {
if (this.isRollbackOnCommitFailure()) {
//提交过程中异常则回滚
this.doRollbackOnCommitException(status, var18);
} else {
//后置回调
this.triggerAfterCompletion(status, 2);
}
throw var18;
} catch (Error | RuntimeException var19) {
if (!beforeCompletionInvoked) {
this.triggerBeforeCompletion(status);
}
//提交过程中异常则回滚
this.doRollbackOnCommitException(status, var19);
throw var19;
}
try {
//后置回调
this.triggerAfterCommit(status);
} finally {
this.triggerAfterCompletion(status, 0);
}
} finally {
//清除事务相关资源信息
this.cleanupAfterCompletion(status);
}
}
上面的逻辑处理能看到这里对于没有被异常捕获的的事务也并非直接提交,而是做了很多逻辑处理,仔细看下能发现和回滚操作中的逻辑处理很相似,除了具体的执行方式不一样,它们的判断逻辑基本都是一样的。
总结
到此Spring中的事务就已经全部解析完成了,但是有关事务的后续,像多数据源的扩展、分布式事务的实现方式等,还并没有结束,同志们仍需努力,加油。同时这里虽然有很多源码细节并没有展开具体分析,但是整个事务的流程已经体现出来了。如果有机会,会针对某些类、实现细节或功能等再做单独分析。
把一件事做到极致就是天分!