执行引擎,顾名思义,就是执行sql语句的引擎,先从入口类开始分析
1.入口ExecutorEngine
public <I, O> List<O> execute(final ExecutionGroupContext<I> executionGroupContext,
final ExecutorCallback<I, O> firstCallback, final ExecutorCallback<I, O> callback, final boolean serial) throws SQLException {
if (executionGroupContext.getInputGroups().isEmpty()) {
return Collections.emptyList();
}
//serial表示串行还是并行,我执行的是select语句,所以是并行
return serial ? serialExecute(executionGroupContext.getInputGroups().iterator(), firstCallback, callback)
: parallelExecute(executionGroupContext.getInputGroups().iterator(), firstCallback, callback);
}
2.并行执行的方法
private <I, O> List<O> parallelExecute(final Iterator<ExecutionGroup<I>> executionGroups, final ExecutorCallback<I, O> firstCallback, final ExecutorCallback<I, O> callback) throws SQLException {
ExecutionGroup<I> firstInputs = executionGroups.next();
Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncExecute(executionGroups, callback);
return getGroupResults(syncExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures);
}
3.异步执行回调JDBCExecutorCallback
public final Collection<T> execute(final Collection<JDBCExecutionUnit> executionUnits, final boolean isTrunkThread, final Map<String, Object> dataMap) throws SQLException {
// TODO It is better to judge whether need sane result before execute, can avoid exception thrown
Collection<T> result = new LinkedList<>();
for (JDBCExecutionUnit each : executionUnits) {
T executeResult = execute(each, isTrunkThread, dataMap);
if (null != executeResult) {
result.add(executeResult);
}
}
return result;
}
4、具体的执行方法
private T execute(final JDBCExecutionUnit jdbcExecutionUnit, final boolean isTrunkThread, final Map<String, Object> dataMap) throws SQLException {
SQLExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
DataSourceMetaData dataSourceMetaData = getDataSourceMetaData(jdbcExecutionUnit.getStorageResource().getConnection().getMetaData());
SQLExecutionHook sqlExecutionHook = new SPISQLExecutionHook();
try {
SQLUnit sqlUnit = jdbcExecutionUnit.getExecutionUnit().getSqlUnit();
sqlExecutionHook.start(jdbcExecutionUnit.getExecutionUnit().getDataSourceName(), sqlUnit.getSql(), sqlUnit.getParameters(), dataSourceMetaData, isTrunkThread, dataMap);
//执行sql,并返回执行结果
T result = executeSQL(sqlUnit.getSql(), jdbcExecutionUnit.getStorageResource(), jdbcExecutionUnit.getConnectionMode());
sqlExecutionHook.finishSuccess();
finishReport(dataMap, jdbcExecutionUnit);
return result;
} catch (final SQLException ex) {
if (!isTrunkThread) {
return null;
}
Optional<T> saneResult = getSaneResult(sqlStatement);
if (saneResult.isPresent()) {
return saneResult.get();
}
sqlExecutionHook.finishFailure(ex);
SQLExecutorExceptionHandler.handleException(ex);
return null;
}
}
5.获取数据源,这里就用到前面的解析引擎和路由引擎
private DataSourceMetaData getDataSourceMetaData(final DatabaseMetaData metaData) throws SQLException {
String url = metaData.getURL();
if (CACHED_DATASOURCE_METADATA.containsKey(url)) {
return CACHED_DATASOURCE_METADATA.get(url);
}
DataSourceMetaData result = databaseType.getDataSourceMetaData(url, metaData.getUserName());
CACHED_DATASOURCE_METADATA.put(url, result);
return result;
}