在经历的几轮面试中,每一轮都问到了事务相关的内容,让我越发感到事务的重要性。
如:
MySQL事务隔离级别?分别解释下他们的含义,默认的事务隔离级别是什么,Oracle的呢?
Spring事务传播级别?分别代表什么含义
Spring事务是如何处理的?自己能写出来吗?
那么今天一起看一下Spring的事务处理方式。我自己想手写事务的时候,发现还是太依赖Spring框架提供的功能了,自己写对我来说还是有一定的难度,在此分析一下Spring的实现方式。
整体结构
Spring初始化概览
Spring整个框架包含很多的过程,其中每一个方法内部都包含了很多要处理的事情,都是细节性的问题,我们先不去深究那些细节,感兴趣的可以自己看看。
@Override
public void refresh() throws BeansException, IllegalStateException {
synchronized (this.startupShutdownMonitor) {
// Prepare this context for refreshing.
prepareRefresh();
// Tell the subclass to refresh the internal bean factory.
ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();
// Prepare the bean factory for use in this context.
prepareBeanFactory(beanFactory);
try {
// Allows post-processing of the bean factory in context subclasses.
postProcessBeanFactory(beanFactory);
// Invoke factory processors registered as beans in the context.
invokeBeanFactoryPostProcessors(beanFactory);
// Register bean processors that intercept bean creation.
registerBeanPostProcessors(beanFactory);
// Initialize message source for this context.
initMessageSource();
// Initialize event multicaster for this context.
initApplicationEventMulticaster();
// Initialize other special beans in specific context subclasses.
onRefresh();
// Check for listener beans and register them.
registerListeners();
// Instantiate all remaining (non-lazy-init) singletons.
finishBeanFactoryInitialization(beanFactory);
// Last step: publish corresponding event.
finishRefresh();
}
catch (BeansException ex) {
if (logger.isWarnEnabled()) {
logger.warn("Exception encountered during context initialization - " +
"cancelling refresh attempt: " + ex);
}
// Destroy already created singletons to avoid dangling resources.
destroyBeans();
// Reset 'active' flag.
cancelRefresh(ex);
// Propagate exception to caller.
throw ex;
}
finally {
// Reset common introspection caches in Spring's core, since we
// might not ever need metadata for singleton beans anymore...
resetCommonCaches();
}
}
}
关于事务的AOP
之前有文章介绍了Spring AOP入门与Advice相关的内容,我之前的文章有写,AOP利用的是动态代理,在我们的方法中,有些方法我们需要进行增强,比如方法周围加上事务的处理,AOP中有个接口是 方法拦截器,借助这个接口我们可以在想要操作的方法外加一些操作。
事务拦截的对象是TransactionInterceptor,可以看出它继承了TransactionAspectSupport. TransactionAspectSupport内部是真正的操作部分。
事务的关键对象
Spring事务中有几个对象很重要,理解了这几个对象就相当于抓住了总体,剩下的一些细节多花些时间就懂了。
-
PlatformTransactionManager 事务管理器,听名字就知道它是管理事务的操作的,它只包含三个方法。获取事务,回顾事务,提交事务
-
TransactionDefiition 定义事务的类型,事务包含很多属性,是否可读,事务隔离级别,事务传播级别。通过事务的定义,我们根据定义获取特定的事务。
-
TransactionStatus 代表一个事务运行的状态,事务管理器通过状态可以知道事务的状态信息,然后进行事务的控制。事务是否完成,是否是新的事务,是不是只能回滚等。
事务处理源码
- 首先获取我们定义的事务属性,可能是定义在XML中,也可能是定义在注解上,总之我们是能获取到定义的事务属性的。
- 根据定义的事务属性获取PlatformTransactionManager,然后获取加入点的标识
- 事务处理部分275行开始。
- 判断是不是要新建事务,最后将事务等相关信息保存在txInfo对象中
- 执行事务内的代码,一般是我们的程序代码。282行
- 失败了执行回滚事务
-
如果没有抛出异常提交事务
创建事务后返回的对象是TrnsactionInfo对象,因此有必要看一下这个对象都包含了什么信息:
- 事务管理器
- 定义的事务属性
- 切入点标识
- 事务状态
-
上一个事务状态
事务传播级别的处理
事务传播级别处理是事务中的一个重点,那么源码中如何处理的呢?从创建事务部分开始看
- 处理事务属性标识,标识大家都理解就是这个谁的属性
- 如果属性不为空,并且书屋管理器不为空那么,获取事务tm.getTransaction.
- 最后准备TransactionInfo
protected TransactionInfo createTransactionIfNecessary(
PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
// If no name specified, apply method identification as transaction name.
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
tm.getTransaction,如何获取事务的呢?
- doGetTransaction是抽象方法,由其父类实现,看名字知道这个是真正获取事务对象的方法,单数据源的一般实现是DatasourceTransactionManager.
- 创建事务的定义,TransactionDefinition,前面有说到这个对象的内容,定义事务的类型等
- 如果已经存在了事务,根据事务的传播级别进行存在事务处理
- 如果不存在事务,根据定义设置事务的超时时间,是否只读,是否新建事务
真正启动事务的是doBegin方法,其内部把autoCommit设置为false
里面还涉及到一个对象TransactionSynchronizationManager,事务同步管理器,主要是一个Map对象,映射当前Datasource到当前的连接。
另外可以看到,如果是新的事务,当传播级别为这三个的时候,会新建事务
TransactionDefinition.PROPAGATION_REQUIRED
TransactionDefinition.PROPAGATION_REQUIRES_NEW TransactionDefinition.PROPAGATION_NESTED
当传播界别为PROPAGATION_MANDATORY,抛出异常
// AbstractPlatformTransactionManager对象
@Override
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
Object transaction = doGetTransaction();
// Cache debug flag to avoid repeated checks.
boolean debugEnabled = logger.isDebugEnabled();
if (definition == null) {
// Use defaults if no transaction definition given.
definition = new DefaultTransactionDefinition();
}
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// Check definition settings for new transaction.
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException ex) {
resume(null, suspendedResources);
throw ex;
}
catch (Error err) {
resume(null, suspendedResources);
throw err;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
对存在的事务进行处理
- 如果传播级别为Never抛异常
- 如果传播级别为NOT_SUPPORTS,那么挂起存在的事务,直接返回不处理事务
- 如果传播界别为PROPAGATION_REQUIRES_NEW,新建事务,同样是doBegin方法
- 如果传播级别是PROPAGATION_NESTED,判断是不是JDBC3.0,是否支持savePoint,以及是否使用savePoint进行新事物的操作
- 最后做一些验证的操作
如何得知是否存在事务了呢?
事务对象是否有Connection,并且Connection的事务是否活跃,
(txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
Transaction对象是由TransactionSynchronizationManager来控制的。其判断当前的数据源是否有连接对应,用的是ThreadLocal
/**
* Create a TransactionStatus for an existing transaction.
*/
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
catch (Error beginErr) {
resumeAfterBeginException(transaction, suspendedResources, beginErr);
throw beginErr;
}
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
最后
关于Spring事务处理的部分,先说到这里。主要是提到了事务传播级别与Spring事务的大体架构,更多细节的内容,还要一起探索啊。