在sql路由过程中,是通过sql路由引擎ShardingRouter完成的,定义如下:
public interface ShardingRouter {
//解析
SQLStatement parse(String logicSQL, boolean useCache);
//路由
SQLRouteResult route(String logicSQL, List<Object> parameters, SQLStatement sqlStatement);
}
可以看到,sql路由解析分为两块,一块做解析,一块做路由。实现类有两个,一个是指定数据库路由DatabaseHintSQLRouter,一个是解析sql路由ParsingSQLRouter。
sql路由在StatementRoutingEngine中被调用,根据参数,选择不同的sql路由解析器解析和做路由。
public final class StatementRoutingEngine {
private final ShardingRouter shardingRouter;
private final ShardingMasterSlaveRouter masterSlaveRouter;
public StatementRoutingEngine(final ShardingRule shardingRule, final ShardingMetaData shardingMetaData, final DatabaseType databaseType, final boolean showSQL) {
//创建sharding路由引擎
shardingRouter = ShardingRouterFactory.createSQLRouter(shardingRule, shardingMetaData, databaseType, showSQL);
//主备数据库路由引擎
masterSlaveRouter = new ShardingMasterSlaveRouter(shardingRule.getMasterSlaveRules());
}
public SQLRouteResult route(final String logicSQL) {
//语法解析
SQLStatement sqlStatement = shardingRouter.parse(logicSQL, false);
//路由规则路由
return masterSlaveRouter.route(shardingRouter.route(logicSQL, Collections.emptyList(), sqlStatement));
}
}
这里以sql语法路由解析器为例,看一下sharding-sphere如何做解析。
//ParsingSQLRouter.java
@Override
public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {
GeneratedKey generatedKey = null;
//判断是不是insert
if (sqlStatement instanceof InsertStatement) {
//又没有自动生成的键
generatedKey = getGenerateKey(shardingRule, (InsertStatement) sqlStatement, parameters);
}
//路由结果
SQLRouteResult result = new SQLRouteResult(sqlStatement, generatedKey);
//优化器,sql优化器优化条件
ShardingConditions shardingConditions = OptimizeEngineFactory.newInstance(shardingRule, sqlStatement, parameters, generatedKey).optimize();
if (null != generatedKey) {
setGeneratedKeys(result, generatedKey);
}
//路由
RoutingResult routingResult = route(parameters, sqlStatement, shardingConditions);
//初始化sql重写引擎
SQLRewriteEngine rewriteEngine = new SQLRewriteEngine(shardingRule, logicSQL, databaseType, sqlStatement, shardingConditions, parameters);
boolean isSingleRouting = routingResult.isSingleRouting();
if (sqlStatement instanceof SelectStatement && null != ((SelectStatement) sqlStatement).getLimit()) {
processLimit(parameters, (SelectStatement) sqlStatement, isSingleRouting);
}
//sql重写
SQLBuilder sqlBuilder = rewriteEngine.rewrite(!isSingleRouting);
//组装表执行单元
for (TableUnit each : routingResult.getTableUnits().getTableUnits()) {
result.getExecutionUnits().add(new SQLExecutionUnit(each.getDataSourceName(), rewriteEngine.generateSQL(each, sqlBuilder)));
}
if (showSQL) {
SQLLogger.logSQL(logicSQL, sqlStatement, result.getExecutionUnits());
}
//返回结果
return result;
}
可以看到,在sql路由解析过程中,经过了四个过程,一个是sql优化器优化sql查询或插入条件,路由,sql重写,最后组装执行单元并返回。
这里重点关注路由,sql优化和sql重写后续再看。
private RoutingResult route(final List<Object> parameters, final SQLStatement sqlStatement, final ShardingConditions shardingConditions) {
//获取表名列表
Collection<String> tableNames = sqlStatement.getTables().getTableNames();
RoutingEngine routingEngine;
//use 语句
if (sqlStatement instanceof UseStatement) {
routingEngine = new IgnoreRoutingEngine();
//ddl 语句
} else if (sqlStatement instanceof DDLStatement) {
//表广播模式,所有表都要处理
routingEngine = new TableBroadcastRoutingEngine(shardingRule, sqlStatement);
} else if (sqlStatement instanceof ShowDatabasesStatement || sqlStatement instanceof ShowTablesStatement) {
//数据库广播模式 所有库都要广播接收消息并处理
routingEngine = new DatabaseBroadcastRoutingEngine(shardingRule);
} else if (shardingConditions.isAlwaysFalse()) {
//如果分片条件不需要调整,则一直使用广播模式
routingEngine = new UnicastRoutingEngine(shardingRule, tableNames);
} else if (sqlStatement instanceof DALStatement) {
//如果是dal语句,则走广播模式
routingEngine = new UnicastRoutingEngine(shardingRule, tableNames);
} else if (tableNames.isEmpty() && sqlStatement instanceof SelectStatement) {
//如果表名称为空,或者是查询语句,则走广播模式
routingEngine = new UnicastRoutingEngine(shardingRule, tableNames);
} else if (tableNames.isEmpty()) {
//若表名为空,则走数据库广播模式
routingEngine = new DatabaseBroadcastRoutingEngine(shardingRule);
} else if (1 == tableNames.size() || shardingRule.isAllBindingTables(tableNames) || shardingRule.isAllInDefaultDataSource(tableNames)) {
//如果只有一个表,或者是所有表都走默认数据源,则走标准路由引擎
routingEngine = new StandardRoutingEngine(shardingRule, tableNames.iterator().next(), shardingConditions);
} else {
// TODO config for cartesian set
routingEngine = new ComplexRoutingEngine(shardingRule, parameters, tableNames, shardingConditions);
}
//路由
return routingEngine.route();
}
可以看到,在路由过程中,判断sqlStatement的类型,和数据库实体表类型选择不同的sql路由引擎路由。
routingEngine.png
以UnicastRoutingEngine路由为例,看一下路由过程:
public final class UnicastRoutingEngine implements RoutingEngine {
private final ShardingRule shardingRule;
private final Collection<String> logicTables;
@Override
public RoutingResult route() {
RoutingResult result = new RoutingResult();
if (logicTables.isEmpty()) {
//如果表为null,则组装表数据单元,不组装RoutingTable
result.getTableUnits().getTableUnits().add(new TableUnit(shardingRule.getShardingDataSourceNames().getDataSourceNames().iterator().next()));
} else if (1 == logicTables.size()) {
//如果只有一张表,则组装表数据单元,和单个表的路由表规则RoutingTable
String logicTableName = logicTables.iterator().next();
DataNode dataNode = shardingRule.findDataNode(logicTableName);
TableUnit tableUnit = new TableUnit(dataNode.getDataSourceName());
tableUnit.getRoutingTables().add(new RoutingTable(logicTableName, dataNode.getTableName()));
result.getTableUnits().getTableUnits().add(tableUnit);
} else {
//如果多个实体表,则组装表数据单元,和单个表的路由表规则RoutingTable
String dataSourceName = null;
List<RoutingTable> routingTables = new ArrayList<>(logicTables.size());
for (String each : logicTables) {
DataNode dataNode = shardingRule.findDataNode(dataSourceName, each);
routingTables.add(new RoutingTable(each, dataNode.getTableName()));
if (null == dataSourceName) {
dataSourceName = dataNode.getDataSourceName();
}
}
TableUnit tableUnit = new TableUnit(dataSourceName);
//添加RoutingTables
tableUnit.getRoutingTables().addAll(routingTables);
result.getTableUnits().getTableUnits().add(tableUnit);
}
return result;
}
}
可以看到,如果是广播模式,会根据实体表的数量,组装不同的RoutingTable。
再看一个标准路由引擎
public final class StandardRoutingEngine implements RoutingEngine {
private final ShardingRule shardingRule;
private final String logicTableName;
private final ShardingConditions shardingConditions;
@Override
public RoutingResult route() {
//获取表规则
TableRule tableRule = shardingRule.getTableRule(logicTableName);
//获取数据库分片策略
Collection<String> databaseShardingColumns = shardingRule.getDatabaseShardingStrategy(tableRule).getShardingColumns();
//获取表分片策略
Collection<String> tableShardingColumns = shardingRule.getTableShardingStrategy(tableRule).getShardingColumns();
Collection<DataNode> routedDataNodes = new LinkedHashSet<>();
//如果是指定数据库类型
if (HintManagerHolder.isUseShardingHint()) {
List<ShardingValue> databaseShardingValues = getDatabaseShardingValuesFromHint(databaseShardingColumns);
List<ShardingValue> tableShardingValues = getTableShardingValuesFromHint(tableShardingColumns);
Collection<DataNode> dataNodes = route(tableRule, databaseShardingValues, tableShardingValues);
for (ShardingCondition each : shardingConditions.getShardingConditions()) {
if (each instanceof InsertShardingCondition) {
((InsertShardingCondition) each).getDataNodes().addAll(dataNodes);
}
}
routedDataNodes.addAll(dataNodes);
} else {
//如果分片条件为null
if (shardingConditions.getShardingConditions().isEmpty()) {
//所有节点添加数据节点
routedDataNodes.addAll(route(tableRule, Collections.<ShardingValue>emptyList(), Collections.<ShardingValue>emptyList()));
} else {
for (ShardingCondition each : shardingConditions.getShardingConditions()) {
List<ShardingValue> databaseShardingValues = getShardingValues(databaseShardingColumns, each);
List<ShardingValue> tableShardingValues = getShardingValues(tableShardingColumns, each);
//路由
Collection<DataNode> dataNodes = route(tableRule, databaseShardingValues, tableShardingValues);
routedDataNodes.addAll(dataNodes);
if (each instanceof InsertShardingCondition) {
((InsertShardingCondition) each).getDataNodes().addAll(dataNodes);
}
}
}
}
return generateRoutingResult(routedDataNodes);
}
private Collection<DataNode> route(final TableRule tableRule, final List<ShardingValue> databaseShardingValues, final List<ShardingValue> tableShardingValues) {
//路由数据库
Collection<String> routedDataSources = routeDataSources(tableRule, databaseShardingValues);
Collection<DataNode> result = new LinkedList<>();
for (String each : routedDataSources) {
result.addAll(routeTables(tableRule, each, tableShardingValues));
}
return result;
}
private List<ShardingValue> getDatabaseShardingValuesFromHint(final Collection<String> shardingColumns) {
List<ShardingValue> result = new ArrayList<>(shardingColumns.size());
for (String each : shardingColumns) {
Optional<ShardingValue> shardingValue = HintManagerHolder.getDatabaseShardingValue(new ShardingKey(logicTableName, each));
if (shardingValue.isPresent()) {
result.add(shardingValue.get());
}
}
return result;
}
private List<ShardingValue> getTableShardingValuesFromHint(final Collection<String> shardingColumns) {
List<ShardingValue> result = new ArrayList<>(shardingColumns.size());
for (String each : shardingColumns) {
Optional<ShardingValue> shardingValue = HintManagerHolder.getTableShardingValue(new ShardingKey(logicTableName, each));
if (shardingValue.isPresent()) {
result.add(shardingValue.get());
}
}
return result;
}
private List<ShardingValue> getShardingValues(final Collection<String> shardingColumns, final ShardingCondition shardingCondition) {
List<ShardingValue> result = new ArrayList<>(shardingColumns.size());
for (ShardingValue each : shardingCondition.getShardingValues()) {
if (logicTableName.equals(each.getLogicTableName()) && shardingColumns.contains(each.getColumnName())) {
result.add(each);
}
}
return result;
}
private Collection<String> routeDataSources(final TableRule tableRule, final List<ShardingValue> databaseShardingValues) {
Collection<String> availableTargetDatabases = tableRule.getActualDatasourceNames();
if (databaseShardingValues.isEmpty()) {
return availableTargetDatabases;
}
Collection<String> result = new LinkedHashSet<>(shardingRule.getDatabaseShardingStrategy(tableRule).doSharding(availableTargetDatabases, databaseShardingValues));
Preconditions.checkState(!result.isEmpty(), "no database route info");
return result;
}
private Collection<DataNode> routeTables(final TableRule tableRule, final String routedDataSource, final List<ShardingValue> tableShardingValues) {
Collection<String> availableTargetTables = tableRule.getActualTableNames(routedDataSource);
Collection<String> routedTables = new LinkedHashSet<>(tableShardingValues.isEmpty() ? availableTargetTables
: shardingRule.getTableShardingStrategy(tableRule).doSharding(availableTargetTables, tableShardingValues));
Preconditions.checkState(!routedTables.isEmpty(), "no table route info");
Collection<DataNode> result = new LinkedList<>();
for (String each : routedTables) {
result.add(new DataNode(routedDataSource, each));
}
return result;
}
//组装路由结果
private RoutingResult generateRoutingResult(final Collection<DataNode> routedDataNodes) {
RoutingResult result = new RoutingResult();
for (DataNode each : routedDataNodes) {
//新增表单元
TableUnit tableUnit = new TableUnit(each.getDataSourceName());
//添加路由表
tableUnit.getRoutingTables().add(new RoutingTable(logicTableName, each.getTableName()));
result.getTableUnits().getTableUnits().add(tableUnit);
}
return result;
}
}
master-slave路由,根据规则路由。
public SQLRouteResult route(final SQLRouteResult sqlRouteResult) {
for (MasterSlaveRule each : masterSlaveRules) {
route(each, sqlRouteResult);
}
return sqlRouteResult;
}
private void route(final MasterSlaveRule masterSlaveRule, final SQLRouteResult sqlRouteResult) {
Collection<SQLExecutionUnit> toBeRemoved = new LinkedList<>();
Collection<SQLExecutionUnit> toBeAdded = new LinkedList<>();
for (SQLExecutionUnit each : sqlRouteResult.getExecutionUnits()) {
//如果不是配置的数据源名称,则忽略
if (!masterSlaveRule.getName().equalsIgnoreCase(each.getDataSource())) {
continue;
}
//删除该执行单元
toBeRemoved.add(each);
//如果是主
if (isMasterRoute(sqlRouteResult.getSqlStatement().getType())) {
//主节点visited
MasterVisitedManager.setMasterVisited();
toBeAdded.add(new SQLExecutionUnit(masterSlaveRule.getMasterDataSourceName(), each.getSqlUnit()));
} else {
//新增sql执行单元
toBeAdded.add(new SQLExecutionUnit(masterSlaveRule.getLoadBalanceAlgorithm().getDataSource(
masterSlaveRule.getName(), masterSlaveRule.getMasterDataSourceName(), new ArrayList<>(masterSlaveRule.getSlaveDataSourceNames())), each.getSqlUnit()));
}
}
//删除不需要执行的单元
sqlRouteResult.getExecutionUnits().removeAll(toBeRemoved);
//添加需要新增的执行单元
sqlRouteResult.getExecutionUnits().addAll(toBeAdded);
}