我们看事务的源码,不仅是为了更好地使用Spring,而且能够从源码中学习到编程思想,设计思想。这篇文章框架如图所示:
一、相关知识
MySQL事务
- 事务的启动
- 显式启动事务语句, begin 或 start transaction。配套的提交语句是 commit,回滚语句是 rollback。
- set autocommit=0,这个命令会将这个线程的自动提交关掉。意味着如果你只执行一个 select 语句,这个事务就启动了,而且并不会自动提交。这个事务持续存在直到你主动执行 commit 或 rollback 语句,或者断开连接。建议设置set autocommit=1,避免造成长事务。
- 事务的隔离性:未提交、提交读、可重复读、串行化
- MVCC
MySQL的默认隔离级别是可重复读,下面以可重复度隔离级别谈谈MVCC是怎么实现可重复读的。
MySQL45讲
begin/start transaction 命令并不是一个事务的起点,在执行到它们之后的第一个操作 InnoDB 表的语句,事务才真正启动。如果你想要马上启动一个事务,可以使用 start transaction with consistent snapshot 这个命令。
第一种启动方式,一致性视图是在执行第一个快照读语句时创建的;第二种启动方式,一致性视图是在执行 start transaction with consistent snapshot 时创建的。
对于MySQL的每一行上,有一个row trx_id,表示当前的版本号,每次事务提交之后,会新增一个版本行,新的版本行的row trx_id更新为事务的id
每个事务所能看到的一致性视图,就是事务启动瞬间已提交的行。如果事务的行还未提交,那么需要回滚到上一个版本。
- undo log回滚
我们看到的上图的箭头,就是 undo log;而 V1、V2、V3 并不是物理上真实存在的,而是每次需要的时候根据当前版本和 undo log 计算出来的。比如,需要 V2 的时候,就是通过 V4 依次执行 两次undo log U3、U2 算出来。
ThreadLocal
ThreadLocal是Java线程封闭的一种方式,将变量局限在线程中,每个线程都有一份数据,避免了变量的共享,也就避免了线程安全的问题。
具体的实现可以看:https://www.jianshu.com/p/22ca4db0a616
Spring AOP
- AOP的一些概念
1.1 Join point和pointcut区别
Join point 就是菜单上的选项,Pointcut就是你选的菜。Join point 你只是你切面中可以切的那些方法,一旦你选择了要切哪些方法,那就是Pointcut。
也就是说,所有在你程序中可以被调用的方法都是Join point. 使用Pointcut 表达式,那些匹配的方法,才叫Pointcut。所以你根本不用关心Join point。比如你有10个方法,你只想切2个方法,那么那10个方法就是Join point, 2个方法就是Pointcut
1.2 Advice
advice就是你作用到pointcut上的方式,你可以使用befor, after 或者around
1.3 advisor
advisor就是作用在具体对象上的ponitcut和advice,把ponitcut和advice合起来就是advisor。切到哪个具体方法上面,是使用的befor、after 还是around,就这个就叫advisor. - 说到Spring AOP,就得谈一下Bean的生命周期。我们知道Bean的生命周期有实例化、属性注入、初始化、销毁。Spring的AOP是在Bean初始化的过程中,生成代理类并返回的。
Spring AOP的源码可以总结为以下内容:
1)通过AutoProxyRegistrar类将AbstractAutoProxyCreator注册到Spring容器中
2)AbstractAutoProxyCreator类的postProcessBeforeInstantiation()只有在自定义 TargetSource的时候才会返回代理后的Bean,否则会继续实例化Bean
3)AbstractAutoProxyCreator类的postProcessAfterInitializatio会在Bean实例化之后调用,里面有个wrapIfNecessary方法获取Advisors,然后创建代理
Object[] specificInterceptors = this.getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, (TargetSource)null);
if (specificInterceptors != DO_NOT_PROXY) {
this.advisedBeans.put(cacheKey, Boolean.TRUE);
Object proxy = this.createProxy(bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
this.proxyTypes.put(cacheKey, proxy.getClass());
return proxy;
} else {
this.advisedBeans.put(cacheKey, Boolean.FALSE);
return bean;
}
4)调用bean方法时通过proxy来调用,proxy依次调用增强器的相关方法,来实现方法切入getBean可以获取到代理后的Bean
Spring 拦截器
Spring事务的切面实现是TransactionInterceptor,里面的Invoke方法是实现了MethodInterceptor的invoke接口,至于Spring的拦截器底层原理是怎么样的呢,还需要再研究研究。
二、源码实现
整体代码流程分析
Spring事务是怎么通过AOP调用到TransactionInterceptor的invoke方法的,感兴趣的可以看下这篇文章:https://my.oschina.net/u/3748038/blog/1602438。总体来说就是SpringBoot的有一个TransactionAutoConfiguration自动配置类,自动配置了@EnableTransactionManagement注解,这个注解作用就是开启事务。具体就是有一个AutoProxyRegistrar注册了,初始化AbstractAutoProxyCreator,AbstractAutoProxyCreator实现了BeanProcessors,所以在Bean实例化前后时候分别调用
postProcessBeforeInitialization(Object bean, String beanName) -- 实例化前执行
Object postProcessAfterInitialization(Object bean, String beanName) --实例化后执行
。然后可以发现在postProcessAfterInitialization方法中调用了wrapIfNecessary,一直往下追踪,可以看到最终会调用ransactionAttributeSource的getTransactionAttribute()方法,这里就用到了那些parser类去真正解析当前bean是否包含那些注解,如果包含注解,那么就把当前类包装成代理类。还有就是事务代理类在执行方法之前调用的切面是BeanFactoryTransactionAttributeSourceAdvisor的advice,亦即TransactionInterceptor,这样大概就能串起来了。
-
TransactionInterceptor的invoke,
然后调用子类TransactionAspectSupport的invokeWithinTransaction方法,这里又是模板方法的意思。
- 关键流程代码都在invokeWithinTransaction方法里,下面只保留了关键代码
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
TransactionAttributeSource tas = getTransactionAttributeSource();
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
//获取事务管理器,这里默认获取到的是DataSourceTransactionManager,
//具体原因不明。。
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// 通过事务传播隔离级别判断是否需要开启新事务
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal;
try {
//调用目标方法
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 发生unchecked异常,或者error的话,需要回滚。
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}
//提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
}
}
-
createTransactionIfNecessary方法,判断是否 需要开启新事务,是根据事务传播条件判断的,具体的判断,在下面讲。这个方法主要关注getTransaction方法
事务管理器
getTransaction中的doGetTransaction、isExistingTransaction、doBegin等调用的都是DataSourceTransactionManager中的方法,事务管理器保存了dataSource,dataSource中有数据库的连接,在事务中的每一个语句的执行,都使用这个连接执行。保证事务的执行,总是复用同一个连接。
doBegin的代码,忽略无关代码
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = obtainDataSource().getConnection();
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
//保存之前的隔离级别,因为当前事务可能是子事务,等当前执行完了,还要再执行
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// 设置是否自动提交,这里的自动提交的含义在上面说过了
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
con.setAutoCommit(false);
}
prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
//设置超时时间
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
if (txObject.isNewConnectionHolder()) {
//连接绑定到线程,这里也是使用的ThreadLocal
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
}
doCommit和doRollback源码比较简单,这里就不开展说了。
事务传播机制
Spring事务是在getTransaction里面判断,是否要开启新事务,判断的主要依据是@Transactional的propagation属性。
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
然后我们可以看下源码,是有以下几种枚举值:
public enum Propagation {
//支持当前事务,如果当前没有事务,就新建一个,是默认的传播机制
REQUIRED(TransactionDefinition.PROPAGATION_REQUIRED),
//支持当前事务,当前没有事务就非事务地执行
SUPPORTS(TransactionDefinition.PROPAGATION_SUPPORTS),
//支持当前事务,如果没有的话,就抛出异常S
MANDATORY(TransactionDefinition.PROPAGATION_MANDATORY),
//创建新事务,并且挂起当前事务
REQUIRES_NEW(TransactionDefinition.PROPAGATION_REQUIRES_NEW),
// 非事务地执行,如果当前有事务就挂起
NOT_SUPPORTED(TransactionDefinition.PROPAGATION_NOT_SUPPORTED),
//非事务地执行,如果当前存在事务就抛出异常
NEVER(TransactionDefinition.PROPAGATION_NEVER),
// 如果当前事务存在,就执行嵌套事务
NESTED(TransactionDefinition.PROPAGATION_NESTED);
}
上面的注释都是基于源码注解的理解,接下来我们看下实际的执行情况,看看对于不同的传播方式,Spring会做哪些处理。依然省略无关代码
@Override
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
//这里调用的是事务管理器的方法,获取到当前的事务
Object transaction = doGetTransaction();
//省略无关代码........
//调用事务管理器的isExistingTransaction,判断是否存在事务,如果存在事务的话,调用handleExistingTransaction
if (isExistingTransaction(transaction)) {
return handleExistingTransaction(definition, transaction, debugEnabled);
}
//省略无关代码.....
// 如果是MANDATORY,而且当前不存在事务,那么抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
//如果是REQUIRED,或者是REQUIRES_NEW,或者是NESTED,那么开启新事务
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
//省略无关代码
}
else {
// 创建一个空事务,没有实际的事务(创建这个事务的意图不是很理解。。。)
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
接下来,我们看看对于已存在事务的情况,Spring是怎么处理的。
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
//如果已存在事务,NEVER传播级别,抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
//NOT_SUPPORTED的话,中止当前事务,然后准备事务,这里并没有开启事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// REQUIRES_NEW,挂起当前事务,新建事务,并开启事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
//NESTED隔离级别
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
//设置了安全点,给已存在的事务创建安全点,应该是为了子事务失败之后,父事务不回滚。
if (useSavepointForNestedTransaction()) {
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}
else {
// 否则开启新事务
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
// SUPPORTS or REQUIRED.
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
看了那么多源码,我们可以总结出以下规律
回滚和提交的条件
protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.hasTransaction()) {
if (txInfo.transactionAttribute.rollbackOn(ex)) {
try {
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
//省略异常代码
}
else {
try {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
//省略异常代码
}
}
}
public boolean rollbackOn(Throwable ex) {
return (ex instanceof RuntimeException || ex instanceof Error);
}
判断rollbackOn方法,成立的时候才会回滚,而对于rollbackOn,判断的是RuntimeException和error两种情况,所以说,其他的异常都会提交事务。
三、使用最佳实践
private修饰方法不生效
Spring事务基于AOP实现,而AOP是通过JDK代理,或者Cglib代理,如果是代理接口,使用的是JDK代理,否则使用CGLIB代理。对于private修饰的方法,会使用Cglib,但是Cglib的原理是对指定的目标类动态生成一个子类,并覆盖其中方法实现增强,但因为采用的是继承,所以不能对final修饰的类和final方法和private方法进行代理。
public AopProxy createAopProxy(AdvisedSupport config) throws AopConfigException {
// 1.config.isOptimize()是否使用优化的代理策略,目前使用与CGLIB
// config.isProxyTargetClass() 是否目标类本身被代理而不是目标类的接口
// hasNoUserSuppliedProxyInterfaces()是否存在代理接口
if (config.isOptimize() || config.isProxyTargetClass() ||hasNoUserSuppliedProxyInterfaces(config)) {
Class<?> targetClass = config.getTargetClass();
if (targetClass == null) {
throw new AopConfigException("TargetSource cannot determine target class: " + "Either an interface or a target is required for proxy creation.");
}
// 2.如果目标类是接口或者是代理类,则直接使用JDKproxy
if (targetClass.isInterface() || Proxy.isProxyClass(targetClass)) {
return new JdkDynamicAopProxy(config);
}
// 3.其他情况则使用CGLIBproxy
return new ObjenesisCglibAopProxy(config);
}
else {
return new JdkDynamicAopProxy(config);
}
}
方法嵌套不生效
这里直接引用极客时间的内容~
Spring 通过 AOP 技术对方法进行增强,要调用增强过的方法必然是调用代理后的对象。我们尝试修改下 UserService 的代码,注入一个 self,然后再通过 self 实例调用标记有 @Transactional 注解的 createUserPublic 方法。设置断点可以看到,self 是由 Spring 通过 CGLIB 方式增强过的类:CGLIB 通过继承方式实现代理类,private 方法在子类不可见,自然也就无法进行事务增强;this 指针代表对象自己,Spring 不可能注入 this,所以通过 this 访问方法必然不是代理。
public int createUserWrong2(String name) {
try {
this.createUserPublic(new UserEntity(name));
} catch (Exception ex) {
log.error("create user failed because {}", ex.getMessage());
}
return userRepository.findByName(name).size();
}
//标记了@Transactional的public方法
@Transactional
public void createUserPublic(UserEntity entity) {
userRepository.save(entity);
if (entity.getName().contains("test"))
throw new RuntimeException("invalid username!");
}
传播方式使用
异常回滚的设置
通过上面源码分析可知,Spring事务,默认只会对RuntimeException和error的异常进行回滚,如果想让未受检异常也回滚,需要在注解上加上
rollbackFor = Exception.class