最近在看源码发现一个问题,在看到DefaultSqlSession这个类的源码的时候,发现这个类上有一句注释,
Note that this class is not Thread-Safe意思说说,此类不是线程安全的,及既然不是线程安全的,怎么还是默认实现那
接下来,我们就一起从源码的角度分析一下,我们写一个小案例,然后通过案例一起分析下,这里我们以查询为主,代码很简单,就是一个简单的查询,我们定义了一个线程,通过countDownLauntch让他们同时请求,我们先执行下,看看结果
@RunWith(SpringRunner.class)
@SpringBootTest
public class DefaultSqlSessionTest {
private static final int COUNT = 10;
private static CountDownLatch count = new CountDownLatch(COUNT);
private SqlSession sqlSession;
@Autowired
private SqlSessionFactory sqlSessionFactory;
@Before
public void init(){
//这里的sqlSession是DefaultSqlSession中的sqlSession
sqlSession = sqlSessionFactory.openSession();
}
@After
public void destory(){
//直接调用DefaultSqlSession,一定记得手动关闭下sqlSession
sqlSession.close();
}
@Test
public void defaultSqlSessionSafeTest() throws InterruptedException {
for (int i = 0;i<10;i++){
new Thread(() ->{
try {
count.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
getAccount2();
}).start();
count.countDown();
}
Thread.sleep(5000);
}
private void getAccount1() {
sqlSession.select("selectByPrimaryKey", 1,resultContext ->{
RyxAccount ryxAccount = (RyxAccount)resultContext.getResultObject();
System.out.println(ryxAccount);
});
}
private void getAccount2(){
sqlSession.selectList("selectByPrimaryKey",1);
}
执行完之后,我们看到的是,报错了,报的是一个强转异常,怎嘛会报这个强转异常嘞,我们器跟着源码,看看,根据打印的堆栈信息,我们进入到源码的DefaultSqlSession这个类一探究竟
当我们执行查询的时候,会调用DefaultSqlSession类下的selectList这个方法,我们接着往下看
@Override
public <E> List<E> selectList(String statement, Object parameter) {
return this.selectList(statement, parameter, RowBounds.DEFAULT);
}
@Override
public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) {
try {
//获取执行的sql语句
MappedStatement ms = configuration.getMappedStatement(statement);
//执行查询
return executor.query(ms, wrapCollection(parameter), rowBounds, Executor.NO_RESULT_HANDLER);
} catch (Exception e) {
throw ExceptionFactory.wrapException("Error querying database. Cause: " + e, e);
} finally {
ErrorContext.instance().reset();
}
}
进入到CacheExecuter类下的 query方法
@Override
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
//获取执行的sql语句
BoundSql boundSql = ms.getBoundSql(parameterObject);
//创建缓存,注意,这个地方,调用的BaseExecuter中的createCacheKey方法
CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql);
return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}
这个方法具体就是将当前的sql语句,等一些类信息,按照指定规则拼装成一个key,然后返回,具体就不再分析了
@Override
public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
if (closed) {
throw new ExecutorException("Executor was closed.");
}
CacheKey cacheKey = new CacheKey();
cacheKey.update(ms.getId());
cacheKey.update(rowBounds.getOffset());
cacheKey.update(rowBounds.getLimit());
cacheKey.update(boundSql.getSql());
List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();
TypeHandlerRegistry typeHandlerRegistry = ms.getConfiguration().getTypeHandlerRegistry();
// mimic DefaultParameterHandler logic
for (ParameterMapping parameterMapping : parameterMappings) {
if (parameterMapping.getMode() != ParameterMode.OUT) {
Object value;
String propertyName = parameterMapping.getProperty();
if (boundSql.hasAdditionalParameter(propertyName)) {
value = boundSql.getAdditionalParameter(propertyName);
} else if (parameterObject == null) {
value = null;
} else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) {
value = parameterObject;
} else {
MetaObject metaObject = configuration.newMetaObject(parameterObject);
value = metaObject.getValue(propertyName);
}
cacheKey.update(value);
}
}
if (configuration.getEnvironment() != null) {
// issue #176
cacheKey.update(configuration.getEnvironment().getId());
}
return cacheKey;
}
接下来,我们拿到了一个这个key,接着往下看,delegate.<E> query这个方法,是真正的查询加添加到缓存中的方法实现,这段代码比较简单,就不做分析了,直接进入到下一个方法,BaseExecuter.query
@Override
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)
throws SQLException {
Cache cache = ms.getCache();
if (cache != null) {
flushCacheIfRequired(ms);
if (ms.isUseCache() && resultHandler == null) {
ensureNoOutParams(ms, parameterObject, boundSql);
@SuppressWarnings("unchecked")
List<E> list = (List<E>) tcm.getObject(cache, key);
if (list == null) {
list = delegate.<E> query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
tcm.putObject(cache, key, list); // issue #578 and #116
}
return list;
}
}
return delegate.<E> query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}
BaseExecuter类
@SuppressWarnings("unchecked")
@Override
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
if (closed) {
throw new ExecutorException("Executor was closed.");
}
if (queryStack == 0 && ms.isFlushCacheRequired()) {
clearLocalCache();
}
List<E> list;
try {
queryStack++;
//这是一个三元运算符,resultHandler 是否为空,如果为空,就去缓存中取内容,否则设置为null,
list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
if (list != null) {
handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
} else {
list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
} finally {
queryStack--;
}
if (queryStack == 0) {
for (DeferredLoad deferredLoad : deferredLoads) {
deferredLoad.load();
}
// issue #601
deferredLoads.clear();
if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
// issue #482
clearLocalCache();
}
}
return list;
}
2.1
private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
List<E> list;
localCache.putObject(key, EXECUTION_PLACEHOLDER);
try {
list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql);
} finally {
localCache.removeObject(key);
}
localCache.putObject(key, list);
if (ms.getStatementType() == StatementType.CALLABLE) {
localOutputParameterCache.putObject(key, parameter);
}
return list;
}
我们重点分析下,一个三元代码 list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
试想一个场景,当两个线程现在开始执行查询账户的业务,线程T_SQL_1和T_SQL_2
1:T_SQL_1先拿到线程执行权,会先调用createCacheKey,如果没有,则会创建这个key,此时假如是第一次查询,localCache.getObject(key)中还不存在key,则list为null
2:执行queryFromDatabase(见2.1)方法,会在这里先给key添加一个默认占位符EXECUTION_PLACEHOLDER
3:然后在这个时候,T_SQL_2获得了线程执行权,调用上面的localCache.getObject(key),获得value:EXECUTION_PLACEHOLDER
4:localCache.getObject(key)此时就不是null了,然后程序开始转换啊,就会变成如下代码,我们模拟下这个解析过程,看个demo
如图,简单模拟了下,如下过程,得到就是强转异常,说明问题就是出现在这里,由于线程争夺资源的问题,这里拿到的key其实是占位符
而不是具体从数据库查询出来的值,谜底终于解开了,原来问题出现在这里,
我们继续研究这句代码list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
resultHandler 这个参数,如果不为空,也就意味着,一级缓存也就失效了,也就不用去缓存中取找了,所以当你使用流式查询的时候,是不会出现这个问题的,因为就不会走缓存,都是查询数据库,不存在缓存的问题
最终的结论是,我们最好不要自己轻易使用DefaultSqlseesion直接去调用查询sql,很容易因为并发问题导致转换异常
当然,既然mybatis的源码大神们早都知道这个DefaultSqlSession这个类线程安全的问题,肯定要处理啊,我们接下来看看他们是怎么处理的,我们看源码中sqlSession接口实现类中看到了一共有三个实现类如图
分别是
1:DefaultSqlSession(已分析)
2:SqlSessionManager(mybatis处理DefaultSqlSession的线程安全管理类)
3:SqlSessionTemplete(spring框架处理mybatis的线程安全的处理框架)
我们先分析下SqlSessionManager,看一下这个类,我们截取一段代码
我们又看到了熟悉的jdk代理技术,当调用SqlSessioManager的查询语句的时候,会先调用SqlSessionInterceptor
这里翻译为拦截器很恰当,我们看到,会去ThreadLocal中获取sqSession,获取不到,就去创建一个DefaultSqSession对象
这样的话,相当于每个线程持有自己的DefaultSqlSession对象,所以,当不同的线程访问的时候,一级缓存也就失效了,
public class SqlSessionManager implements SqlSessionFactory, SqlSession {
private final SqlSessionFactory sqlSessionFactory;
private final SqlSession sqlSessionProxy;
private final ThreadLocal<SqlSession> localSqlSession = new ThreadLocal<SqlSession>();
private SqlSessionManager(SqlSessionFactory sqlSessionFactory) {
this.sqlSessionFactory = sqlSessionFactory;
this.sqlSessionProxy = (SqlSession) Proxy.newProxyInstance(
SqlSessionFactory.class.getClassLoader(),
new Class[]{SqlSession.class},
new SqlSessionInterceptor());
}
public static SqlSessionManager newInstance(Reader reader) {
return new SqlSessionManager(new SqlSessionFactoryBuilder().build(reader, null, null));
}
.....省略
private class SqlSessionInterceptor implements InvocationHandler {
public SqlSessionInterceptor() {
// Prevent Synthetic Access
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//去ThreadLocal中获取sqlSesison,如果获取不到,
final SqlSession sqlSession = SqlSessionManager.this.localSqlSession.get();
if (sqlSession != null) {
try {
return method.invoke(sqlSession, args);
} catch (Throwable t) {
throw ExceptionUtil.unwrapThrowable(t);
}
} else {
//当qlSession为null,调用openSession方法,然后去调用sqlSessionFactory.openSession()
//创建一个DefaultSqlSession的对象,
final SqlSession autoSqlSession = openSession();
try {
final Object result = method.invoke(autoSqlSession, args);
autoSqlSession.commit();
return result;
} catch (Throwable t) {
autoSqlSession.rollback();
throw ExceptionUtil.unwrapThrowable(t);
} finally {
autoSqlSession.close();
}
}
}
}
SqlSessionFactory
private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
Transaction tx = null;
try {
final Environment environment = configuration.getEnvironment();
final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
final Executor executor = configuration.newExecutor(tx, execType);
return new DefaultSqlSession(configuration, executor, autoCommit);
} catch (Exception e) {
closeTransaction(tx); // may have fetched a connection so lets call close()
throw ExceptionFactory.wrapException("Error opening session. Cause: " + e, e);
} finally {
ErrorContext.instance().reset();
}
}
分析了这么多,我们来个案例,来验证下SqlSessionManager,案例很简单,就不做分析了,我改了下源码,打印了日志,
@RunWith(SpringRunner.class)
@SpringBootTest
public class DefaultSqlSessionManagerTest {
private static final int COUNT_THREAD = 10;
private static CountDownLatch count = new CountDownLatch(COUNT_THREAD);
private SqlSessionManager sqlSessionManager;
@Autowired
private SqlSessionFactory sqlSessionFactory;
@Before
public void init(){
sqlSessionManager = SqlSessionManager.newInstance(sqlSessionFactory);
}
@Test
public void sqlSessionManagerTest() throws InterruptedException {
for (int i = 0;i<COUNT_THREAD;i++){
new Thread(() ->{
try {
count.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
getAccount1();
}).start();
count.countDown();
}
Thread.sleep(5000);
}
private void getAccount1() {
sqlSessionManager.startManagedSession();
sqlSessionManager.
selectList("selectByPrimaryKey",1);
}
}
这是一部分日志,可以看到每个线程生成了新的Slqsession,所以也就保证了线程安全
SqlSessionManager可以允许我们将sqlSession设置到ThreadLoacl中,这样也可以保证DefaultSqlSession线程安全
具体就是添加一句如下代码sqlSessionManager.startManagedSession();这样,我们就为每个线程分配了一个SqlSession并存储到
ThreadLocal中,这样也是一样的效果,通过ThreadLocal,get方法会获取到具体的sqlSession对象,但是这里有个问题,由于这个ThradLocal是私有的,set完之后,在关闭后,清除ThreadLocal中的内容实在关闭sqlSession后,就是在这里
@Override
public void close() {
final SqlSession sqlSession = localSqlSession.get();
if (sqlSession == null) {
throw new SqlSessionException("Error: Cannot close. No managed session is started.");
}
try {
sqlSession.close();
} finally {
//直接将ThreadLocal中的当前线程变量sqlSession设置为null
localSqlSession.set(null);
}
}
ok,SqlSessionManager就分析到这里,代码还是比较简单的,就到这里,下一期,我们一起看下spring到底是怎样保证defaultSqlSession线程安全的,
Thanks!
更多博客,请移步到博主技术博客https://renyuanxin.top