首先要明白一点:需要 Spring 管理 Mybatis 的事务,它两运行时必须在同一个 Connection 的同一事务下。明白这一点就比较容易了。接下来就是想方设法的让它两在同一个连接的同一事务下。如果你熟悉 Spring 的话,肯定能猜到它就是使用了动态代理。
Mybatis 中最主要的几个类:
- SqlSession
- Executor
- SqlSessionFactory
- SqlSessionFactoryBuilder
- Transaction
Mybatis-Spring 自动配置类 :
- SqlSessionTemplate
- MybatisAutoConfiguration
- SqlSessionFactoryBean
- MapperProxy
- MapperProxyFactory
重要类简单介绍
1. SqlSessionTemplate
SqlSessionTemplate 是 SqlSession 的实现类,它用来替代 Mybatis 原来的 DefaultSelSession。它在 MapperProxyFactory 中被封装进代理类。
SqlSessionTemplate 中一个重要属性:sqlSessionProxy。它也是一个 SqlSession,它在 SqlSessionTemplate 构造方法中创建。但是仔细看它的创建,它创建的一个 SqlSession 的代理类,使用的 JDK 动态代理。InvocationHandler 是 SqlSessionInterceptor,源码如下:
private class SqlSessionInterceptor implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 获取 SqlSession,首先从 TransactionSynchronizationManager.getResource 获取,如果没有则之间调用 SqlSessionFactory 创建一个新的 SqlSession (默认是 DefaultSqlSession)
SqlSession sqlSession = getSqlSession(
SqlSessionTemplate.this.sqlSessionFactory,
SqlSessionTemplate.this.executorType,
SqlSessionTemplate.this.exceptionTranslator);
try {
// 调用 DefaultSqlSession 的方法
Object result = method.invoke(sqlSession, args);
if (!isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) {
// force commit even on non-dirty sessions because some databases require
// a commit/rollback before calling close()
sqlSession.commit(true);
}
return result;
} catch (Throwable t) {
Throwable unwrapped = unwrapThrowable(t);
if (SqlSessionTemplate.this.exceptionTranslator != null && unwrapped instanceof PersistenceException) {
// release the connection to avoid a deadlock if the translator is no loaded. See issue #22
closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
sqlSession = null;
Throwable translated = SqlSessionTemplate.this.exceptionTranslator.translateExceptionIfPossible((PersistenceException) unwrapped);
if (translated != null) {
unwrapped = translated;
}
}
throw unwrapped;
} finally {
if (sqlSession != null) {
closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
}
}
}
}
之后 MapperMethod 中调用时所有的方法都会点用这个 invoke 方法。
2. SpringManagedTransactionFactory,SpringManagedTransaction
SpringManagedTransactionFactory 是 TransactionFactory 的实现类,在创建 Mybatis 的 Confituration 的 Environment 时创建,具体可以看 SqlSessionFactoryBean#buildSqlSessionFactory 方法,源码如下:
protected SqlSessionFactory buildSqlSessionFactory() throws IOException {
Configuration configuration;
...
...部分省略
if (this.transactionFactory == null) {
// 创建 SpringManagedTransactionFactory
this.transactionFactory = new SpringManagedTransactionFactory();
}
// 把 SpringManagedTransactionFactory 和 datasource 放入 Environment
configuration.setEnvironment(new Environment(this.environment, this.transactionFactory, this.dataSource));
...
...部分省略
return this.sqlSessionFactoryBuilder.build(configuration);
}
SpringManagedTransactionFactory 源码:
public class SpringManagedTransactionFactory implements TransactionFactory {
/**
* {@inheritDoc}
*/
@Override
public Transaction newTransaction(DataSource dataSource, TransactionIsolationLevel level, boolean autoCommit) {
return new SpringManagedTransaction(dataSource);
}
...
...
很简单就是返回一个 SpringManagedTransaction。那么 SpringManagedTransactionFactory 在何时使用呢?在 SqlSessionFactory 创建 SqlSession 时使用,源码如下:
private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
Transaction tx = null;
try {
final Environment environment = configuration.getEnvironment();
final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
final Executor executor = configuration.newExecutor(tx, execType);
return new DefaultSqlSession(configuration, executor, autoCommit);
} catch (Exception e) {
closeTransaction(tx); // may have fetched a connection so lets call close()
throw ExceptionFactory.wrapException("Error opening session. Cause: " + e, e);
} finally {
ErrorContext.instance().reset();
}
}
可以看到 Transaction 最后被放入了执行器 Executor。
这里说下,Mybatis 大概的执行流程,MapperProxy->MapperMethod->SqlSession -> Executor->Statement
接下来看下 SpringManagedTransaction 源码如下:
// 获取连接
@Override
public Connection getConnection() throws SQLException {
if (this.connection == null) {
openConnection();
}
return this.connection;
}
private void openConnection() throws SQLException {
// 通过 DataSourceUtils 获取连接
this.connection = DataSourceUtils.getConnection(this.dataSource);
this.autoCommit = this.connection.getAutoCommit();
this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"JDBC Connection ["
+ this.connection
+ "] will"
+ (this.isConnectionTransactional ? " " : " not ")
+ "be managed by Spring");
}
}
DataSourceUtils.getConnection(this.dataSource);
public static Connection getConnection(DataSource dataSource) throws CannotGetJdbcConnectionException {
try {
return doGetConnection(dataSource);
}
catch (SQLException ex) {
throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection", ex);
}
catch (IllegalStateException ex) {
throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection: " + ex.getMessage());
}
}
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
Assert.notNull(dataSource, "No DataSource specified");
// TransactionSynchronizationManager 通过 dateSource 获取 connection
ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
conHolder.requested();
if (!conHolder.hasConnection()) {
logger.debug("Fetching resumed JDBC Connection from DataSource");
conHolder.setConnection(fetchConnection(dataSource));
}
return conHolder.getConnection();
}
// Else we either got no holder or an empty thread-bound holder here.
// ThreadLocal 中没有 connection 则从连接池中获取
logger.debug("Fetching JDBC Connection from DataSource");
Connection con = fetchConnection(dataSource);
// 如果开启事务同步的话就放入 ThreadLocal
if (TransactionSynchronizationManager.isSynchronizationActive()) {
try {
// Use same Connection for further JDBC actions within the transaction.
// Thread-bound object will get removed by synchronization at transaction completion.
ConnectionHolder holderToUse = conHolder;
if (holderToUse == null) {
holderToUse = new ConnectionHolder(con);
}
else {
holderToUse.setConnection(con);
}
holderToUse.requested();
TransactionSynchronizationManager.registerSynchronization(
new ConnectionSynchronization(holderToUse, dataSource));
holderToUse.setSynchronizedWithTransaction(true);
if (holderToUse != conHolder) {
TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
}
}
catch (RuntimeException ex) {
// Unexpected exception from external delegation call -> close Connection and rethrow.
releaseConnection(con, dataSource);
throw ex;
}
}
return con;
}
这里如果使用了事务的话,TransactionSynchronizationManager.getResource(dataSource) 返回的不会是一个null。接下来就分析下,Spring 是何时把 ConnectionHolder 放入到 TransactionSynchronizationManager 的。Spring 使用了一个 TransactionInterceptor,它是 MethodInterceptor 的实现。
中间的调用过程
ReflectiveMethodInvocation#proceed ->
TransactionInterceptor#invoke ->
TransactionAspectSupport#invokeWithinTransaction ->
TransactionAspectSupport#createTransactionIfNecessary ->
AbstractPlatformTransactionManager#getTransaction ->
DataSourceTransactionManager#doBegin
最终它会进入到 DataSourceTransactionManager#doBegin,源码如下:
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
if (!txObject.hasConnectionHolder() ||
:txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// 把 ConnectionHolder 通过 TransactionSynchronizationManager 绑定到 ThreadLocal
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
最后
最后我们把这个逻辑捋下:
- 当我们调用代理方法是,如果有 TransactionInterceptor 拦截器在,调用 拦截器方法
- 使用 AbstractPlatformTransactionManager 的 doBegin 方法(根据不同的事务管理器实现),一般我们使用数据源事务管理器,把 Connection 绑定到 ThreadLocal
- Mybatis Executor 在执行时,使用 SpringManagedTransaction 获取第二步中的 Connection