mysql咱们都在用,可能有一半左右的公司可能都没有接触到数据量大了之后会出现的问题吧,因为业务就没有那样大,但是当咱们数据库里面数据量大了之后,去查询大量数据
就会出现问题,什么问题那?原来,mysql会一次性的将数据查询出来后放入内存中,然后在返回到页面,但是当数据量巨大,这个时候就会撑爆咱们的内存,导致OOM,我们公司
用户量4000多万,交易数据大概有几十亿条,这样的数据量,如果查询,秒级崩溃,咱们怎嘛解决那,一个比较常见的方式是使用分页,那马除了分页,还有其他方法吗?接下来,就
是这篇文章要讨论的myabtis自带的流式查询,咱们一起来看一下
xml映射文件,就是一个简单的查询
<select id="selectTest" resultMap="base_result_map">
select <include refid="base_column_list" /> from `ryx_account` where id BETWEEN 1 AND #{id}
</select>
mapper
public List<RyxAccount> selectTest(@Param("id")int id);
controller
@RequestMapping("/findPage3")
public String findPage() throws Exception {
StopWatch stopWatch = new StopWatch("Test");
stopWatch.start();
sqlSessionTemplate.select("selectTest", 5000, resultContext -> {
final RyxAccount ryxAccount = (RyxAccount) resultContext.getResultObject();
System.out.println(Thread.currentThread().getName()+"线程"+JSON.toJSONString(ryxAccount));
});
stopWatch.stop();
System.out.println(stopWatch.prettyPrint());
return "ok";
}
启动后咱们访问后,可以看到
http-nio-8089-exec-2线程{"createTime":1561480357000,"id":1,"money":900,"name":"张三","updateTime":1561491458000}
http-nio-8089-exec-2线程{"createTime":1561480367000,"id":2,"money":1100,"name":"李四","updateTime":1561491458000}
http-nio-8089-exec-2线程{"createTime":1568732427000,"id":3,"money":0,"name":"_NAME0","updateTime":1568732427000}
http-nio-8089-exec-2线程{"createTime":1568732427000,"id":4,"money":0,"name":"_NAME0","updateTime":1568732427000}
http-nio-8089-exec-2线程{"createTime":1568732427000,"id":5,"money":1,"name":"_NAME1","updateTime":1568732427000}
http-nio-8089-exec-2线程{"createTime":1568732428000,"id":6,"money":0,"name":"_NAME0","updateTime":1568732428000}
http-nio-8089-exec-2线程{"createTime":1568732428000,"id":7,"money":1,"name":"_NAME1","updateTime":1568732428000}
http-nio-8089-exec-2线程{"createTime":1568732428000,"id":8,"money":2,"name":"_NAME2","updateTime":1568732428000}
http-nio-8089-exec-2线程{"createTime":1568732428000,"id":9,"money":0,"name":"_NAME0","updateTime":1568732428000}
http-nio-8089-exec-2线程{"createTime":1568732428000,"id":10,"money":1,"name":"_NAME1","updateTime":1568732428000}
http-nio-8089-exec-2线程{"createTime":1568732428000,"id":11,"money":2,"name":"_NAME2","updateTime":1568732428000}
http-nio-8089-exec-2线程{"createTime":1568732428000,"id":12,"money":3,"name":"_NAME3","updateTime":1568732428000}
http-nio-8089-exec-2线程{"createTime":1568732428000,"id":13,"money":0,"name":"_NAME0","updateTime":1568732428000}
http-nio-8089-exec-2线程{"createTime":1568732428000,"id":14,"money":1,"name":"_NAME1","updateTime":1568732428000}
http-nio-8089-exec-2线程{"createTime":1568732428000,"id":15,"money":2,"name":"_NAME2","updateTime":1568732428000}
会一条条的打印出结果,也就是会按照一条条的返回结果,这样,就避免了查询的数据量过大导致系统OOM
接下来我们分析下源码,看看mybaits到底是怎样实现的
demo很简单,可以看到已经实现了基本查询,接下来,我们一起来看一下源码,myabtis到底是怎样实现的,再看流式查询是如何实现之前,我们先来复习下前面的内容
我们已经知道sqSession有三个实现类
1:DefaultSqlSession,(sqlSession的默认实现,非线程安全)
2:SqlSessionManager,(sqlSession管理器,通过ThreadLocal实现线程安全的sqlSession)
3:SqlSessionTemplete(spring整合myabtis提供的SqlSession模板,由于现在基本上咱们在用mybatis的时候,都不会单独的只用mybais框架,都会引入spring框架,所以我认为
此类就是为了mybatis整合spring提供的专门的模板类)
那大家又要问了.DefaultSqlSession为什么线程不安全,我们再来复习下前面的sql语句的执行过程,
1:确定当前执行的sql语句是查询还是插入或者是更新(查询为例)
2:调用执行器的querry方法,myabtis默认的是 SimpleExecutor执行器
3:SimpleExecutor执行器执行的时候会调用baseExecuter
4:调用queryFromDatabase方法查询数据库,如果命中缓存,直接从缓存中返回数据,如果未命中,查询数据库,并放入缓存
线程不安全就是出现在缓存这一块baseExecuter中定义的localCache是hashMap,hashMap就是线程不安全的,试想一个过程
线程A查询数据库,假设查询出了10条数据,就在将要放入缓存的时候,线程B进来了,然后这个时候,插入了一条数据,查询出了11条数据
更新了缓存,然后线程A这个时候再去更新缓存,将数据重新更新为10条,这样就导致了后续的查询都使用的是线程1的缓存,导致查询结果不正确。
那吗myabtis是如何解决这个问题的那
接下来就是SqlSessionManager和SqlSessionTemplete出场了
咱们先来分析SqlSessionTemplete这个myabtis为spring专门生成的模板类
#SqlSessionTemplate
public class SqlSessionTemplate implements SqlSession, DisposableBean {
//session工厂
private final SqlSessionFactory sqlSessionFactory;
//执行器类型
private final ExecutorType executorType;
//sqlSession
private final SqlSession sqlSessionProxy;
//异常转换器
private final PersistenceExceptionTranslator exceptionTranslator;
public SqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
this(sqlSessionFactory, sqlSessionFactory.getConfiguration().getDefaultExecutorType());
}
public SqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType) {
this(sqlSessionFactory, executorType,
new MyBatisExceptionTranslator(
sqlSessionFactory.getConfiguration().getEnvironment().getDataSource(), true));
}
//构造函数
public SqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType,
PersistenceExceptionTranslator exceptionTranslator) {
notNull(sqlSessionFactory, "Property 'sqlSessionFactory' is required");
notNull(executorType, "Property 'executorType' is required");
this.sqlSessionFactory = sqlSessionFactory;
this.executorType = executorType;
this.exceptionTranslator = exceptionTranslator;
//1.使用代理的方式获取sqlSession的代理类实例
this.sqlSessionProxy = (SqlSession) newProxyInstance(
SqlSessionFactory.class.getClassLoader(),
new Class[] { SqlSession.class },
//创建SqlSessionInterceptor类实现InvocationHandler 接口,这是处理selSesison的核心
new SqlSessionInterceptor());
}
//...省略部分方法
//2需要将MyBatis方法调用路由到从Spring的事务管理器获得的适当SqlSession的代理*还可以将{@code Method#invoke(Object,Object ...)}
//引发的异常解包以*将{@code PersistenceException}传递给{@code PersistenceExceptionTranslator}。
private class SqlSessionInterceptor implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//1:获取SqlSession这个方法可以根据Spring的事物上下文来获取事物范围内的sqlSession
SqlSession sqlSession = getSqlSession(
SqlSessionTemplate.this.sqlSessionFactory,
SqlSessionTemplate.this.executorType,
SqlSessionTemplate.this.exceptionTranslator);
try {
//2:调用从Spring的事物上下文获取事物范围内的sqlSession对象
Object result = method.invoke(sqlSession, args);
//校验当前的sqlSession是否被Spring管理 如果未被Spring托管则自动commit
if (!isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) {
// force commit even on non-dirty sessions because some databases require
// a commit/rollback before calling close()
sqlSession.commit(true);
}
return result;
} catch (Throwable t) {
//如果出现异常则根据情况转换后抛出
Throwable unwrapped = unwrapThrowable(t);
if (SqlSessionTemplate.this.exceptionTranslator != null && unwrapped instanceof PersistenceException) {
// release the connection to avoid a deadlock if the translator is no loaded. See issue #22
closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
sqlSession = null;
Throwable translated = SqlSessionTemplate.this.exceptionTranslator.translateExceptionIfPossible((PersistenceException) unwrapped);
if (translated != null) {
unwrapped = translated;
}
}
throw unwrapped;
} finally {
//3:关闭sqlSession
if (sqlSession != null) {
closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
}
}
}
}
}
首先当我们启动项目的时候,会加载sqlSessionTemplete的构造函数,为sqlSession生成代理类
接下来重点分析方法1和方法3
1:getSqlSession
3:closeSqlSession
getSqlSession
public static SqlSession getSqlSession(SqlSessionFactory sessionFactory) {
//获取执行器类型
ExecutorType executorType = sessionFactory.getConfiguration().getDefaultExecutorType();
//调用重载方法
return getSqlSession(sessionFactory, executorType, null);
}
public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator) {
notNull(sessionFactory, NO_SQL_SESSION_FACTORY_SPECIFIED);
notNull(executorType, NO_EXECUTOR_TYPE_SPECIFIED);
//获取sqlSession的持有者对象
SqlSessionHolder holder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sessionFactory);
//从SqlSessionHolder对象中获取SqlSession对象
SqlSession session = sessionHolder(executorType, holder);
//不为null直接返回session,为null的话,执行下面逻辑
if (session != null) {
return session;
}
LOGGER.debug(() -> "Creating a new SqlSession");
//如果当前事物管理器中获取不到SqlSessionHolder对象就重新创建一个
session = sessionFactory.openSession(executorType);
//将新创建的SqlSessionHolder对象注册到TransactionSynchronizationManager中
registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session);
return session;
}
上面方法中出现了一个新的类TransactionSynchronizationManager(事物同步管理器类),大胆猜想sqlSession的线程安全就应该在这里啦,我们去看看这个类
TransactionSynchronizationManager
//获取绑定到当前线程的对应的键的资源
public static Object getResource(Object key) {
//对包资源执行解析
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
//获取资源
Object value = doGetResource(actualKey);
if (value != null && logger.isTraceEnabled()) {
logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
Thread.currentThread().getName() + "]");
}
return value;
}
doGetResource
private static Object doGetResource(Object actualKey) {
//从resources中获取map集合,resources定义为一个threadLocal里面的对象是一个map
Map<Object, Object> map = resources.get();
//如果map为null直接返回
if (map == null) {
return null;
}
//获取资源对应的值,这里的map中存储的key和value到底是什么那
Object value = map.get(actualKey);
// 删除无效的资源持有者
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
map.remove(actualKey);
// Remove entire ThreadLocal if empty...
if (map.isEmpty()) {
resources.remove();
}
value = null;
}
return value;
}
这里咱们有个问题,resources这个ThreadLocal里面的map中的key和value到底存储的是什么那,继续往下看
registerSessionHolder
private static void registerSessionHolder(SqlSessionFactory sessionFactory, ExecutorType executorType,
PersistenceExceptionTranslator exceptionTranslator, SqlSession session) {
SqlSessionHolder holder;
//判断当前的ThreadLocal中是否存在对应的TransactionSynchronization
if (TransactionSynchronizationManager.isSynchronizationActive()) {
//从sessionFactory对象中获取Environment对象
Environment environment = sessionFactory.getConfiguration().getEnvironment();
//判断事物工厂是不是有spring事物管理
if (environment.getTransactionFactory() instanceof SpringManagedTransactionFactory) {
LOGGER.debug(() -> "Registering transaction synchronization for SqlSession [" + session + "]");
//构建sqlSession持有者对象
holder = new SqlSessionHolder(session, executorType, exceptionTranslator);
//将指定键的指定资源绑定到当前线程
TransactionSynchronizationManager.bindResource(sessionFactory, holder);
//为当前线程注册新的事物
TransactionSynchronizationManager.registerSynchronization(new SqlSessionSynchronization(holder, sessionFactory));
//设置事物同步
holder.setSynchronizedWithTransaction(true);
//注册成功后,表明有人拿到了资源,就将引用计数执行+1操作
holder.requested();
} else {
//从datasource获取资源,如果为null,证明是非事物的,否则抛出异常
if (TransactionSynchronizationManager.getResource(environment.getDataSource()) == null) {
LOGGER.debug(() -> "SqlSession [" + session + "] was not registered for synchronization because DataSource is not transactional");
} else {
throw new TransientDataAccessResourceException(
"SqlSessionFactory must be using a SpringManagedTransactionFactory in order to use Spring transaction synchronization");
}
}
} else {
LOGGER.debug(() -> "SqlSession [" + session + "] was not registered for synchronization because synchronization is not active");
}
}
这里有5个步骤,其中2和3步骤我们具体分析下,其他步骤就不做具体分析了
1:构建sqlSession持有者对象
2:将指定键的指定资源绑定到当前线程
3:为当前线程注册新的事物
4:设置事物同步
5:执行引用计数+1
bindResource
public static void bindResource(Object key, Object value) throws IllegalStateException {
//执行资源解析
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Assert.notNull(value, "Value must not be null");
//从resources获取map集合(resources是一个ThreadLocal对象)
Map<Object, Object> map = resources.get();
// 向threadLocal中设置空map集合
if (map == null) {
map = new HashMap<>();
resources.set(map);
}
//将sessionFactory作为key,SqlSessionHolder作为value放入map集合中
Object oldValue = map.put(actualKey, value);
// 如果sqlSession持有者已经存在,则设置为null
if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
oldValue = null;
}
//不为null,则抛出异常,已经存在sqlSesison
if (oldValue != null) {
throw new IllegalStateException("Already value [" + oldValue + "] for key [" +
actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
}
if (logger.isTraceEnabled()) {
logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" +
Thread.currentThread().getName() + "]");
}
}
registerSynchronization
public static void registerSynchronization(TransactionSynchronization synchronization)
throws IllegalStateException {
//判断TransactionSynchronization是否为null
Assert.notNull(synchronization, "TransactionSynchronization must not be null");
if (!isSynchronizationActive()) {
throw new IllegalStateException("Transaction synchronization is not active");
}
//将synchronization添加到ThreadLocal对象的set集合中
synchronizations.get().add(synchronization);
}
closeSqlSession
public static void closeSqlSession(SqlSession session, SqlSessionFactory sessionFactory) {
notNull(session, NO_SQL_SESSION_SPECIFIED);
notNull(sessionFactory, NO_SQL_SESSION_FACTORY_SPECIFIED);
//获取sqlSession的持有者
SqlSessionHolder holder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sessionFactory);
//判断sqlSession的持有者是否为null
if ((holder != null) && (holder.getSqlSession() == session)) {
LOGGER.debug(() -> "Releasing transactional SqlSession [" + session + "]");
//将sqlSession持有者的引用计数减一
holder.released();
} else {
//如果不是被spring管理,那么就不会被Spring去关闭回收,就需要自己close
LOGGER.debug(() -> "Closing non transactional SqlSession [" + session + "]");
session.close();
}
}
写到这里,会不会感觉有点多余,不是流式查询吗,怎嘛分析到这里了,总的来说,流式查询的代码很简单,咱们可以直接使用,但是还是想在分析流式查询之前再去分析下这几个包装类
我这里做个大胆的猜想:
mybatis提供的defaultSqlSesison线程不安全,所以引入了sqlSessionManager来使得sqlSession线程安全,但是现在我们在使用mybaits的时候,都是配合spring一起使用的
所以出现了一个为spring分装的sqlSessionTemplete使用ThreadLocal的方式使得sqlSession线程安全,这样的话,sqlSessionManager这个类就不会被使用了,不知道猜测对不对,
接下来,还有一个实现了sqlSession的类SqlSessionManager,我们在一起分析下
public class SqlSessionManager implements SqlSessionFactory, SqlSession {
private final SqlSessionFactory sqlSessionFactory;
private final SqlSession sqlSessionProxy;
private final ThreadLocal<SqlSession> localSqlSession = new ThreadLocal<SqlSession>();
private SqlSessionManager(SqlSessionFactory sqlSessionFactory) {
this.sqlSessionFactory = sqlSessionFactory;
//使用代理的方式获取sqlSession
this.sqlSessionProxy = (SqlSession) Proxy.newProxyInstance(
SqlSessionFactory.class.getClassLoader(),
new Class[]{SqlSession.class},
new SqlSessionInterceptor());
}
//省略部分代码....
private class SqlSessionInterceptor implements InvocationHandler {
public SqlSessionInterceptor() {
// Prevent Synthetic Access
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//从threadlocal获取当前线程对应的sqlSession
final SqlSession sqlSession = SqlSessionManager.this.localSqlSession.get();
//执行代理方法最终执行sql语句
if (sqlSession != null) {
try {
return method.invoke(sqlSession, args);
} catch (Throwable t) {
throw ExceptionUtil.unwrapThrowable(t);
}
} else {
//如果当前线程中没有sqlSession,就调用SqlSessionFactory创建一个sqlSession
final SqlSession autoSqlSession = openSession();
try {
//执行sql语句
final Object result = method.invoke(autoSqlSession, args);
//执行事务提交
autoSqlSession.commit();
return result;
} catch (Throwable t) {
//出现异常执行回滚
autoSqlSession.rollback();
throw ExceptionUtil.unwrapThrowable(t);
} finally {
autoSqlSession.close();
}
}
}
}
}
以上就是我对defaultSqlSession,sqlSessionManager,sqlSessionTemplete的一点分析,接下来,我们进入到今天的主题,流式查询的源码分析中
由于我的项目是引入了spring框架,所以当我执行sql语句的时候,会被sqlSessionTemplete的拦截器拦截,执行sqlSession的获取操作,
接下来,我们进入sqlSessionTemplete的流式查询的select方法
我们看到存在三个重载方法
String statement, 执行的sql语句的mapper映射方法
Object parameter,参数对象
RowBounds rowBounds,分页
ResultHandler handler结果处理器
@Override
public void select(String statement, ResultHandler handler) {
this.sqlSessionProxy.select(statement, handler);
}
/**
- {@inheritDoc}
*/
@Override
public void select(String statement, Object parameter, ResultHandler handler) {
this.sqlSessionProxy.select(statement, parameter, handler);
}
/**
- {@inheritDoc}
*/
@Override
public void select(String statement, Object parameter, RowBounds rowBounds, ResultHandler handler) {
this.sqlSessionProxy.select(statement, parameter, rowBounds, handler);
}
//接着往下分析,看一看mybatis的流式查询到底是怎嘛实现的,我们执行下以下方法,
1:首先被sqlSessionTemplete的拦截器拦截,获取线程安全的sqlSession
sqlSessionTemplate.select("selectTest", 5000, resultContext -> {
final RyxAccount ryxAccount = (RyxAccount) resultContext.getResultObject();
System.out.println(Thread.currentThread().getName()+"线程"+JSON.toJSONString(ryxAccount));
});
2:执行以下重载方法
@Override
public void select(String statement, Object parameter, ResultHandler handler) {
this.sqlSessionProxy.select(statement, parameter, handler);
}
3:按照之前的分析,mybatis的默认实现是DefaultsqlSession,进入DefaultsqlSession的select方法
@Override
public void select(String statement, Object parameter, ResultHandler handler) {
//进入重载方法
select(statement, parameter, RowBounds.DEFAULT, handler);
}
@Override
public void select(String statement, Object parameter, RowBounds rowBounds, ResultHandler handler) {
try {
//获取statement对象
MappedStatement ms = configuration.getMappedStatement(statement);
//执行数据库查询操作
executor.query(ms, wrapCollection(parameter), rowBounds, handler);
} catch (Exception e) {
throw ExceptionFactory.wrapException("Error querying database. Cause: " + e, e);
} finally {
ErrorContext.instance().reset();
}
}
4:进入CacheExecuter的querry方法
@Override
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
//获取sql语句
BoundSql boundSql = ms.getBoundSql(parameterObject);
//创建缓存
CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql);
//调用querry方法
return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}
@Override
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)
throws SQLException {
Cache cache = ms.getCache();
//查看缓存是否为空
if (cache != null) {
flushCacheIfRequired(ms);
if (ms.isUseCache() && resultHandler == null) {
ensureNoOutParams(ms, parameterObject, boundSql);
@SuppressWarnings("unchecked")
List<E> list = (List<E>) tcm.getObject(cache, key);
if (list == null) {
list = delegate.<E> query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
tcm.putObject(cache, key, list); // issue #578 and #116
}
return list;
}
}
return delegate.<E> query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}
5:进入BaseExecuter的querry方法
@Override
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
if (closed) {
throw new ExecutorException("Executor was closed.");
}
if (queryStack == 0 && ms.isFlushCacheRequired()) {
clearLocalCache();
}
List<E> list;
try {
queryStack++;
//获取结果集
list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
//判断是否在缓存中
if (list != null) {
//处理本地缓存的输出参数
handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
} else {
//查询数据库
list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
} finally {
queryStack--;
}
if (queryStack == 0) {
for (DeferredLoad deferredLoad : deferredLoads) {
deferredLoad.load();
}
// issue #601
deferredLoads.clear();
if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
// issue #482
clearLocalCache();
}
}
return list;
}
private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
List<E> list;
//向缓存中添加标志位
localCache.putObject(key, EXECUTION_PLACEHOLDER);
try {
//执行默认执行器的doQuerry方法
list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql);
} finally {
localCache.removeObject(key);
}
localCache.putObject(key, list);
if (ms.getStatementType() == StatementType.CALLABLE) {
localOutputParameterCache.putObject(key, parameter);
}
return list;
}
6:进入SimpleExecuter的doQuerry方法
@Override
public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
Statement stmt = null;
try {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
stmt = prepareStatement(handler, ms.getStatementLog());
return handler.<E>query(stmt, resultHandler);
} finally {
closeStatement(stmt);
}
}
7:进入PreparedStatementHandler的Querry方法
@Override
public <E> List<E> query(Statement statement, ResultHandler resultHandler) throws SQLException {
PreparedStatement ps = (PreparedStatement) statement;
ps.execute();
return resultSetHandler.<E> handleResultSets(ps);
}
8:进入DefaultResultSetHandler的handleResultSets方法
@Override
public List<Object> handleResultSets(Statement stmt) throws SQLException {
ErrorContext.instance().activity("handling results").object(mappedStatement.getId());
final List<Object> multipleResults = new ArrayList<Object>();
int resultSetCount = 0;
ResultSetWrapper rsw = getFirstResultSet(stmt);
List<ResultMap> resultMaps = mappedStatement.getResultMaps();
//结果映射数
int resultMapCount = resultMaps.size();
//检查结果映射数量
validateResultMapsCount(rsw, resultMapCount);
while (rsw != null && resultMapCount > resultSetCount) {
//获取结果映射
ResultMap resultMap = resultMaps.get(resultSetCount);
//处理映射结果集
handleResultSet(rsw, resultMap, multipleResults, null);
rsw = getNextResultSet(stmt);
cleanUpAfterHandlingResultSet();
//结果集数量加1,
resultSetCount++;
}
String[] resultSets = mappedStatement.getResultSets();
if (resultSets != null) {
while (rsw != null && resultSetCount < resultSets.length) {
ResultMapping parentMapping = nextResultMaps.get(resultSets[resultSetCount]);
if (parentMapping != null) {
String nestedResultMapId = parentMapping.getNestedResultMapId();
ResultMap resultMap = configuration.getResultMap(nestedResultMapId);
handleResultSet(rsw, resultMap, null, parentMapping);
}
rsw = getNextResultSet(stmt);
cleanUpAfterHandlingResultSet();
resultSetCount++;
}
}
return collapseSingleResultList(multipleResults);
}
//处理结果集
private void handleResultSet(ResultSetWrapper rsw, ResultMap resultMap, List<Object> multipleResults, ResultMapping parentMapping) throws SQLException {
try {
if (parentMapping != null) {
//处理父映射
handleRowValues(rsw, resultMap, null, RowBounds.DEFAULT, parentMapping);
} else {
if (resultHandler == null) {
DefaultResultHandler defaultResultHandler = new DefaultResultHandler(objectFactory);
handleRowValues(rsw, resultMap, defaultResultHandler, rowBounds, null);
multipleResults.add(defaultResultHandler.getResultList());
} else {
handleRowValues(rsw, resultMap, resultHandler, rowBounds, null);
}
}
} finally {
// issue #228 (close resultsets)
closeResultSet(rsw.getResultSet());
}
}
//处理行值
public void handleRowValues(ResultSetWrapper rsw, ResultMap resultMap, ResultHandler<?> resultHandler, RowBounds rowBounds, ResultMapping parentMapping) throws SQLException {
if (resultMap.hasNestedResultMaps()) {
ensureNoRowBounds();
checkResultHandler();
//处理嵌套结果集映射
handleRowValuesForNestedResultMap(rsw, resultMap, resultHandler, rowBounds, parentMapping);
} else {
//处理简单结果集映射
handleRowValuesForSimpleResultMap(rsw, resultMap, resultHandler, rowBounds, parentMapping);
}
}
//处理简单结果集的行值
private void handleRowValuesForSimpleResultMap(ResultSetWrapper rsw, ResultMap resultMap, ResultHandler<?> resultHandler, RowBounds rowBounds, ResultMapping parentMapping)
throws SQLException {
DefaultResultContext<Object> resultContext = new DefaultResultContext<Object>();
skipRows(rsw.getResultSet(), rowBounds);
while (shouldProcessMoreRows(resultContext, rowBounds) && rsw.getResultSet().next()) {
ResultMap discriminatedResultMap = resolveDiscriminatedResultMap(rsw.getResultSet(), resultMap, null);
Object rowValue = getRowValue(rsw, discriminatedResultMap);
//存储对象
storeObject(resultHandler, resultContext, rowValue, parentMapping, rsw.getResultSet());
}
}
//存储对象方法
private void storeObject(ResultHandler<?> resultHandler, DefaultResultContext<Object> resultContext, Object rowValue, ResultMapping parentMapping, ResultSet rs) throws SQLException {
if (parentMapping != null) {
linkToParents(rs, parentMapping, rowValue);
} else {
//调用结果处理程序
callResultHandler(resultHandler, resultContext, rowValue);
}
}
//调用结果处理方法
private void callResultHandler(ResultHandler<?> resultHandler, DefaultResultContext<Object> resultContext, Object rowValue) {
resultContext.nextResultObject(rowValue);
((ResultHandler<Object>) resultHandler).handleResult(resultContext);
}
9:进入DefaultResultContext的nextResultObject方法
public void nextResultObject(T resultObject) {
//行值+1原来最终的处理在这里,真的踏破铁鞋无觅处得来全不费工夫
resultCount++;
//返回结果对象
this.resultObject = resultObject;
}
myabtis的流式查询就弄完了,啰啰嗦嗦的说了好多,会有很多不足的地方,万望指教,下一期我们分析下mybatis的游标查询
Thanks