归并引擎就是将结果和并的引擎,我们先从入口类开始MergeEngine
1.先看下他的属性有哪些
//数据类型,mysql
private final DatabaseType databaseType;
//数据库的一些信息对象
private final ShardingSphereSchema schema;
//这是是配置的一些属性
private final ConfigurationProperties props;
2.将QueryResult根据不同设置进行合并
public MergedResult merge(final List<QueryResult> queryResults, final SQLStatementContext<?> sqlStatementContext) throws SQLException {
Optional<MergedResult> mergedResult = executeMerge(queryResults, sqlStatementContext);
Optional<MergedResult> result = mergedResult.isPresent() ? Optional.of(decorate(mergedResult.get(), sqlStatementContext)) : decorate(queryResults.get(0), sqlStatementContext);
return result.orElseGet(() -> new TransparentMergedResult(queryResults.get(0)));
}
3.执行合并的方法,返回合并结果
private Optional<MergedResult> executeMerge(final List<QueryResult> queryResults, final SQLStatementContext<?> sqlStatementContext) throws SQLException {
for (Entry<ShardingSphereRule, ResultProcessEngine> entry : engines.entrySet()) {
if (entry.getValue() instanceof ResultMergerEngine) {
ResultMerger resultMerger = ((ResultMergerEngine) entry.getValue()).newInstance(databaseType, entry.getKey(), props, sqlStatementContext);
return Optional.of(resultMerger.merge(queryResults, sqlStatementContext, schema));
}
}
return Optional.empty();
}
4.对合并的结果进行再处理
private MergedResult decorate(final MergedResult mergedResult, final SQLStatementContext<?> sqlStatementContext) throws SQLException {
MergedResult result = null;
for (Entry<ShardingSphereRule, ResultProcessEngine> entry : engines.entrySet()) {
if (entry.getValue() instanceof ResultDecoratorEngine) {
ResultDecorator resultDecorator = ((ResultDecoratorEngine) entry.getValue()).newInstance(databaseType, schema, entry.getKey(), props, sqlStatementContext);
result = null == result ? resultDecorator.decorate(mergedResult, sqlStatementContext, entry.getKey()) : resultDecorator.decorate(result, sqlStatementContext, entry.getKey());
}
}
return null == result ? mergedResult : result;
}
5.ShardingDQLResultMerger类,根据不同的数据库类型进行相应的处理
private MergedResult decoratePaginationContext(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final MergedResult mergedResult) throws SQLException {
PaginationContext paginationContext = selectStatementContext.getPaginationContext();
if (!paginationContext.isHasPagination() || 1 == queryResults.size()) {
return mergedResult;
}
String trunkDatabaseName = DatabaseTypeRegistry.getTrunkDatabaseType(databaseType.getName()).getName();
if ("MySQL".equals(trunkDatabaseName) || "PostgreSQL".equals(trunkDatabaseName)) {
return new LimitDecoratorMergedResult(mergedResult, paginationContext);
}
if ("Oracle".equals(trunkDatabaseName)) {
return new RowNumberDecoratorMergedResult(mergedResult, paginationContext);
}
if ("SQLServer".equals(trunkDatabaseName)) {
return new TopAndRowNumberDecoratorMergedResult(mergedResult, paginationContext);
}
return mergedResult;
}
6.LimitDecoratorMergedResult类和DecoratorMergedResult类,对限定条件进行合并和筛选
public LimitDecoratorMergedResult(final MergedResult mergedResult, final PaginationContext pagination) throws SQLException {
super(mergedResult);
this.pagination = pagination;
skipAll = skipOffset();
}