Sharding-JDBC系列
开篇
数据源配置
public final class ShardingTablesConfigurationPrecise implements ExampleConfiguration {
@Override
public DataSource getDataSource() throws SQLException {
ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();
shardingRuleConfig.getTableRuleConfigs().add(getOrderTableRuleConfiguration());
shardingRuleConfig.getTableRuleConfigs().add(getOrderItemTableRuleConfiguration());
shardingRuleConfig.getBindingTableGroups().add("t_order, t_order_item");
shardingRuleConfig.getBroadcastTables().add("t_address");
shardingRuleConfig.setDefaultTableShardingStrategyConfig(new StandardShardingStrategyConfiguration("order_id", new PreciseModuloShardingTableAlgorithm()));
return ShardingDataSourceFactory.createDataSource(createDataSourceMap(), shardingRuleConfig, new Properties());
}
private static TableRuleConfiguration getOrderTableRuleConfiguration() {
TableRuleConfiguration result = new TableRuleConfiguration("t_order", "demo_ds.t_order_${[0, 1]}");
result.setKeyGeneratorConfig(new KeyGeneratorConfiguration("SNOWFLAKE", "order_id", getProperties()));
return result;
}
private static TableRuleConfiguration getOrderItemTableRuleConfiguration() {
TableRuleConfiguration result = new TableRuleConfiguration("t_order_item", "demo_ds.t_order_item_${[0, 1]}");
result.setKeyGeneratorConfig(new KeyGeneratorConfiguration("SNOWFLAKE", "order_item_id", getProperties()));
return result;
}
private static Map<String, DataSource> createDataSourceMap() {
Map<String, DataSource> result = new HashMap<>();
result.put("demo_ds", DataSourceUtil.createDataSource("demo_ds"));
return result;
}
private static Properties getProperties() {
Properties result = new Properties();
result.setProperty("worker.id", "123");
return result;
}
}
- t_order和t_order_item表按照分表的逻辑进行保存,在同一个database当中保存2个table。
- 我们这里暂时分析t_order表的插入过程,核心应该在于选择t_order_0还是t_order_1。
SQL执行流程
public class OrderRepositoryImpl implements OrderRepository {
private final DataSource dataSource;
// OrderRepositoryImpl的构造函数参数为ShardingDataSource
public OrderRepositoryImpl(final DataSource dataSource) {
this.dataSource = dataSource;
}
@Override
public Long insert(final Order order) throws SQLException {
String sql = "INSERT INTO t_order (user_id, address_id, status) VALUES (?, ?, ?)";
// 1、从dataSource中获取Connection对象,dataSource是ShardingDataSource对象
try (Connection connection = dataSource.getConnection();
// 2、通过connection准备PreparedStatement对象
PreparedStatement preparedStatement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
// 3、设置preparedStatement的参数
preparedStatement.setInt(1, order.getUserId());
preparedStatement.setLong(2, order.getAddressId());
preparedStatement.setString(3, order.getStatus());
// 4、执行preparedStatement的executeUpdate
preparedStatement.executeUpdate();
// 5、处理ResultSet对象
try (ResultSet resultSet = preparedStatement.getGeneratedKeys()) {
if (resultSet.next()) {
order.setOrderId(resultSet.getLong(1));
}
}
}
return order.getOrderId();
}
}
- 1、从dataSource中获取Connection对象,dataSource是ShardingDataSource对象。
- 2、通过connection准备PreparedStatement对象。
- 3、设置preparedStatement的参数。
- 4、执行preparedStatement的executeUpdate。
- 5、处理ResultSet对象。
- mysql的核心执行流程就是按照上面的步骤执行。
核心变量介绍
ShardingRuntimeContext
public final class ShardingRuntimeContext extends AbstractRuntimeContext<ShardingRule> {
private final DatabaseMetaData cachedDatabaseMetaData;
// 核心变量ShardingSphereMetaData
private final ShardingSphereMetaData metaData;
private final ShardingTransactionManagerEngine shardingTransactionManagerEngine;
public ShardingRuntimeContext(final Map<String, DataSource> dataSourceMap, final ShardingRule rule, final Properties props, final DatabaseType databaseType) throws SQLException {
super(rule, props, databaseType);
cachedDatabaseMetaData = createCachedDatabaseMetaData(dataSourceMap, rule);
metaData = createMetaData(dataSourceMap, rule, databaseType);
shardingTransactionManagerEngine = new ShardingTransactionManagerEngine();
shardingTransactionManagerEngine.init(databaseType, dataSourceMap);
}
}
public abstract class AbstractRuntimeContext<T extends BaseRule> implements RuntimeContext<T> {
private final T rule;
private final ShardingProperties props;
private final DatabaseType databaseType;
private final ShardingExecuteEngine executeEngine;
private final SQLParseEngine parseEngine;
protected AbstractRuntimeContext(final T rule, final Properties props, final DatabaseType databaseType) {
this.rule = rule;
this.props = new ShardingProperties(null == props ? new Properties() : props);
this.databaseType = databaseType;
executeEngine = new ShardingExecuteEngine(this.props.<Integer>getValue(ShardingPropertiesConstant.EXECUTOR_SIZE));
parseEngine = SQLParseEngineFactory.getSQLParseEngine(DatabaseTypes.getTrunkDatabaseTypeName(databaseType));
ConfigurationLogger.log(rule.getRuleConfiguration());
ConfigurationLogger.log(props);
}
@Override
public void close() throws Exception {
executeEngine.close();
}
}
- ShardingRuntimeContext作为分片上下文包含各类核心变量。
- ShardingSphereMetaData包含分片元数据。
- executeEngine为执行引擎。
- parseEngine为解析引擎。
ShardingDataSource
@Getter
public class ShardingDataSource extends AbstractDataSourceAdapter {
private final ShardingRuntimeContext runtimeContext;
public ShardingDataSource(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule, final Properties props) throws SQLException {
super(dataSourceMap);
checkDataSourceType(dataSourceMap);
runtimeContext = new ShardingRuntimeContext(dataSourceMap, shardingRule, props, getDatabaseType());
}
@Override
public final ShardingConnection getConnection() {
return new ShardingConnection(getDataSourceMap(), runtimeContext, TransactionTypeHolder.get());
}
}
- ShardingDataSource的getConnection返回ShardingConnection对象。
ShardingConnection
public final class ShardingConnection extends AbstractConnectionAdapter {
private final Map<String, DataSource> dataSourceMap;
private final ShardingRuntimeContext runtimeContext;
private final TransactionType transactionType;
private final ShardingTransactionManager shardingTransactionManager;
public ShardingConnection(final Map<String, DataSource> dataSourceMap, final ShardingRuntimeContext runtimeContext, final TransactionType transactionType) {
this.dataSourceMap = dataSourceMap;
this.runtimeContext = runtimeContext;
this.transactionType = transactionType;
shardingTransactionManager = runtimeContext.getShardingTransactionManagerEngine().getTransactionManager(transactionType);
}
@Override
public PreparedStatement prepareStatement(final String sql, final int autoGeneratedKeys) throws SQLException {
return new ShardingPreparedStatement(this, sql, autoGeneratedKeys);
}
}
- ShardingConnection的prepareStatement返回ShardingPreparedStatement对象。
ShardingPreparedStatement
public final class ShardingPreparedStatement extends AbstractShardingPreparedStatementAdapter {
@Getter
private final ShardingConnection connection;
// 待执行的SQL
private final String sql;
// 路由引擎
private final PreparedQueryShardingEngine shardingEngine;
// 执行引擎
private final PreparedStatementExecutor preparedStatementExecutor;
private final BatchPreparedStatementExecutor batchPreparedStatementExecutor;
private SQLRouteResult sqlRouteResult;
private ResultSet currentResultSet;
public ShardingPreparedStatement(final ShardingConnection connection, final String sql, final int autoGeneratedKeys) throws SQLException {
this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, Statement.RETURN_GENERATED_KEYS == autoGeneratedKeys);
}
private ShardingPreparedStatement(
final ShardingConnection connection, final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys)
throws SQLException {
if (Strings.isNullOrEmpty(sql)) {
throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
}
this.connection = connection;
this.sql = sql;
ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
// 核心的是PreparedQueryShardingEngine对象
shardingEngine = new PreparedQueryShardingEngine(sql, runtimeContext.getRule(), runtimeContext.getProps(), runtimeContext.getMetaData(), runtimeContext.getParseEngine());
preparedStatementExecutor = new PreparedStatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, returnGeneratedKeys, connection);
batchPreparedStatementExecutor = new BatchPreparedStatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, returnGeneratedKeys, connection);
}
}
- ShardingPreparedStatement对象的包含传入的sql和connection对象。
- ShardingPreparedStatement的核心变量包含shardingEngine、preparedStatementExecutor、batchPreparedStatementExecutor。
- shardingEngine负责执行分片选择对应的库表。
- preparedStatementExecutor负责执行具体的SQL。
PreparedQueryShardingEngine
public final class PreparedQueryShardingEngine extends BaseShardingEngine {
private final PreparedStatementRoutingEngine routingEngine;
public PreparedQueryShardingEngine(final String sql,
final ShardingRule shardingRule, final ShardingProperties shardingProperties, final ShardingSphereMetaData metaData, final SQLParseEngine sqlParseEngine) {
super(shardingRule, shardingProperties, metaData);
routingEngine = new PreparedStatementRoutingEngine(sql, shardingRule, metaData, sqlParseEngine);
}
@Override
protected List<Object> cloneParameters(final List<Object> parameters) {
return new ArrayList<>(parameters);
}
@Override
protected SQLRouteResult route(final String sql, final List<Object> parameters) {
return routingEngine.route(parameters);
}
}
- PreparedQueryShardingEngine包含PreparedStatementRoutingEngine对象。
- PreparedQueryShardingEngine负责执行路由功能,核心通过routingEngine来执行。
PreparedStatementRoutingEngine
public final class PreparedStatementRoutingEngine {
private final String logicSQL;
private final ShardingRouter shardingRouter;
private final ShardingMasterSlaveRouter masterSlaveRouter;
private SQLStatement sqlStatement;
public PreparedStatementRoutingEngine(final String logicSQL, final ShardingRule shardingRule, final ShardingSphereMetaData metaData, final SQLParseEngine sqlParseEngine) {
this.logicSQL = logicSQL;
shardingRouter = new ShardingRouter(shardingRule, metaData, sqlParseEngine);
masterSlaveRouter = new ShardingMasterSlaveRouter(shardingRule.getMasterSlaveRules());
}
public SQLRouteResult route(final List<Object> parameters) {
if (null == sqlStatement) {
sqlStatement = shardingRouter.parse(logicSQL, true);
}
return masterSlaveRouter.route(shardingRouter.route(logicSQL, parameters, sqlStatement));
}
}
- PreparedStatementRoutingEngine包含shardingRouter和masterSlaveRouter。
- shardingRouter负责执行路由功能。
变量之间关系
- ShardingDataSource负责提供ShardingConnection。
- ShardingConnection负责提供ShardingPreparedStatement。
- ShardingPreparedStatement负责提供PreparedQueryShardingEngine和PreparedStatementExecutor。
- PreparedQueryShardingEngine负责提供PreparedStatementRoutingEngine。
- PreparedStatementRoutingEngine负责提供ShardingRouter。
Statement执行过程
public final class ShardingPreparedStatement extends AbstractShardingPreparedStatementAdapter {
@Getter
private final ShardingConnection connection;
private final String sql;
private final PreparedQueryShardingEngine shardingEngine;
private final PreparedStatementExecutor preparedStatementExecutor;
private final BatchPreparedStatementExecutor batchPreparedStatementExecutor;
private SQLRouteResult sqlRouteResult;
private ResultSet currentResultSet;
@Override
public int executeUpdate() throws SQLException {
try {
// 1、清空之前的查询
clearPrevious();
// 2、执行shard操作
shard();
// 3、初始化PreparedStatementExecutor
initPreparedStatementExecutor();
// 4、执行preparedStatementExecutor的executeUpdate方法
return preparedStatementExecutor.executeUpdate();
} finally {
clearBatch();
}
}
private void clearPrevious() throws SQLException {
preparedStatementExecutor.clear();
}
private void shard() {
// shardingEngine为PreparedQueryShardingEngine
sqlRouteResult = shardingEngine.shard(sql, getParameters());
}
private void initPreparedStatementExecutor() throws SQLException {
preparedStatementExecutor.init(sqlRouteResult);
setParametersForStatements();
replayMethodForStatements();
}
}
public final class PreparedStatementExecutor extends AbstractStatementExecutor {
public int executeUpdate() throws SQLException {
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
SQLExecuteCallback<Integer> executeCallback = SQLExecuteCallbackFactory.getPreparedUpdateSQLExecuteCallback(getDatabaseType(), isExceptionThrown);
List<Integer> results = executeCallback(executeCallback);
if (isAccumulate()) {
return accumulate(results);
} else {
return results.get(0);
}
}
}
- clearPrevious清空之前的查询。
- shard通过shardingEngine执行分片操作,shardingEngine为PreparedQueryShardingEngine对象。
- initPreparedStatementExecutor负责初始化PreparedStatementExecutor。
- preparedStatementExecutor.executeUpdate执行preparedStatementExecutor的executeUpdate动作。
- 核心关注路由过程和executeUpdate过程。
路由过程
- 整体路由顺序按照PreparedQueryShardingEngine
=> PreparedStatementRoutingEngine => ShardingRouter => StandardRoutingEngine 进行路由。
BaseShardingEngine
public abstract class BaseShardingEngine {
public SQLRouteResult shard(final String sql, final List<Object> parameters) {
List<Object> clonedParameters = cloneParameters(parameters);
// 执行executeRoute
SQLRouteResult result = executeRoute(sql, clonedParameters);
result.getRouteUnits().addAll(HintManager.isDatabaseShardingOnly() ? convert(sql, clonedParameters, result) : rewriteAndConvert(sql, clonedParameters, result));
boolean showSQL = shardingProperties.getValue(ShardingPropertiesConstant.SQL_SHOW);
if (showSQL) {
boolean showSimple = shardingProperties.getValue(ShardingPropertiesConstant.SQL_SIMPLE);
SQLLogger.logSQL(sql, showSimple, result.getSqlStatementContext(), result.getRouteUnits());
}
return result;
}
private SQLRouteResult executeRoute(final String sql, final List<Object> clonedParameters) {
routingHook.start(sql);
try {
SQLRouteResult result = route(sql, clonedParameters);
routingHook.finishSuccess(result, metaData.getTables());
return result;
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
routingHook.finishFailure(ex);
throw ex;
}
}
}
- BaseShardingEngine是PreparedQueryShardingEngine的基类。
- BaseShardingEngine代表的是PreparedQueryShardingEngine对象。
- executeRoute执行PreparedQueryShardingEngine的route方法。
PreparedQueryShardingEngine
public final class PreparedQueryShardingEngine extends BaseShardingEngine {
private final PreparedStatementRoutingEngine routingEngine;
public PreparedQueryShardingEngine(final String sql,
final ShardingRule shardingRule, final ShardingProperties shardingProperties, final ShardingSphereMetaData metaData, final SQLParseEngine sqlParseEngine) {
super(shardingRule, shardingProperties, metaData);
routingEngine = new PreparedStatementRoutingEngine(sql, shardingRule, metaData, sqlParseEngine);
}
@Override
protected SQLRouteResult route(final String sql, final List<Object> parameters) {
return routingEngine.route(parameters);
}
}
- PreparedQueryShardingEngine的routingEngine为PreparedStatementRoutingEngine对象。
- 执行PreparedStatementRoutingEngine的route方法。
PreparedStatementRoutingEngine
public final class PreparedStatementRoutingEngine {
private final String logicSQL;
private final ShardingRouter shardingRouter;
private final ShardingMasterSlaveRouter masterSlaveRouter;
private SQLStatement sqlStatement;
public PreparedStatementRoutingEngine(final String logicSQL, final ShardingRule shardingRule, final ShardingSphereMetaData metaData, final SQLParseEngine sqlParseEngine) {
this.logicSQL = logicSQL;
shardingRouter = new ShardingRouter(shardingRule, metaData, sqlParseEngine);
masterSlaveRouter = new ShardingMasterSlaveRouter(shardingRule.getMasterSlaveRules());
}
public SQLRouteResult route(final List<Object> parameters) {
if (null == sqlStatement) {
sqlStatement = shardingRouter.parse(logicSQL, true);
}
return masterSlaveRouter.route(shardingRouter.route(logicSQL, parameters, sqlStatement));
}
}
- PreparedStatementRoutingEngine的shardingRouter为ShardingRouter对象。
- 执行ShardingRouter的route方法。
ShardingRouter
public final class ShardingRouter {
public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {
Optional<ShardingStatementValidator> shardingStatementValidator = ShardingStatementValidatorFactory.newInstance(sqlStatement);
if (shardingStatementValidator.isPresent()) {
shardingStatementValidator.get().validate(shardingRule, sqlStatement, parameters);
}
// 1、创建SQLStatementContext
SQLStatementContext sqlStatementContext = SQLStatementContextFactory.newInstance(metaData.getRelationMetas(), logicSQL, parameters, sqlStatement);
Optional<GeneratedKey> generatedKey = sqlStatement instanceof InsertStatement
? GeneratedKey.getGenerateKey(shardingRule, metaData.getTables(), parameters, (InsertStatement) sqlStatement) : Optional.<GeneratedKey>absent();
//2、创建ShardingConditions
ShardingConditions shardingConditions = getShardingConditions(parameters, sqlStatementContext, generatedKey.orNull(), metaData.getRelationMetas());
boolean needMergeShardingValues = isNeedMergeShardingValues(sqlStatementContext);
if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && needMergeShardingValues) {
checkSubqueryShardingValues(sqlStatementContext, shardingConditions);
mergeShardingConditions(shardingConditions);
}
//3、创建RoutingEngine对象,为StandardRoutingEngine对象。
RoutingEngine routingEngine = RoutingEngineFactory.newInstance(shardingRule, metaData, sqlStatementContext, shardingConditions);
//4、 执行流程,核心走的StandardRoutingEngine路由规则
RoutingResult routingResult = routingEngine.route();
if (needMergeShardingValues) {
Preconditions.checkState(1 == routingResult.getRoutingUnits().size(), "Must have one sharding with subquery.");
}
//5、创建SQLRouteResult对象
SQLRouteResult result = new SQLRouteResult(sqlStatementContext, shardingConditions, generatedKey.orNull());
result.setRoutingResult(routingResult);
if (sqlStatementContext instanceof InsertSQLStatementContext) {
setGeneratedValues(result);
}
return result;
}
}
- 1、创建SQLStatementContext对象sqlStatementContext。
- 2、创建ShardingConditions对象shardingConditions。
- 3、创建RoutingEngine对象routingEngine,为StandardRoutingEngine对象。
- 4、执行routingEngine.route()并生成RoutingResult对象routingResult。
- 5、生成SQLRouteResult对象并返回。
- 重点关注RoutingEngine.route()的方法。
StandardRoutingEngine
public final class StandardRoutingEngine implements RoutingEngine {
private final ShardingRule shardingRule;
private final String logicTableName;
private final SQLStatementContext sqlStatementContext;
private final ShardingConditions shardingConditions;
@Override
public RoutingResult route() {
if (isDMLForModify(sqlStatementContext.getSqlStatement()) && !sqlStatementContext.getTablesContext().isSingleTable()) {
throw new ShardingException("Cannot support Multiple-Table for '%s'.", sqlStatementContext.getSqlStatement());
}
return generateRoutingResult(getDataNodes(shardingRule.getTableRule(logicTableName)));
}
private Collection<DataNode> getDataNodes(final TableRule tableRule) {
// 根据Hint去路由
if (isRoutingByHint(tableRule)) {
return routeByHint(tableRule);
}
// 根据条件去路由
if (isRoutingByShardingConditions(tableRule)) {
return routeByShardingConditions(tableRule);
}
// 混个条件去路由
return routeByMixedConditions(tableRule);
}
private Collection<DataNode> routeByShardingConditions(final TableRule tableRule) {
// 执行routeByShardingConditionsWithCondition方法
return shardingConditions.getConditions().isEmpty()
? route0(tableRule, Collections.<RouteValue>emptyList(), Collections.<RouteValue>emptyList()) : routeByShardingConditionsWithCondition(tableRule);
}
private Collection<DataNode> routeByShardingConditionsWithCondition(final TableRule tableRule) {
Collection<DataNode> result = new LinkedList<>();
for (ShardingCondition each : shardingConditions.getConditions()) {
// Database维度的分片值,table维度的分片值
Collection<DataNode> dataNodes = route0(tableRule, getShardingValuesFromShardingConditions(shardingRule.getDatabaseShardingStrategy(tableRule).getShardingColumns(), each),
getShardingValuesFromShardingConditions(shardingRule.getTableShardingStrategy(tableRule).getShardingColumns(), each));
each.getDataNodes().addAll(dataNodes);
result.addAll(dataNodes);
}
return result;
}
private Collection<DataNode> route0(final TableRule tableRule, final List<RouteValue> databaseShardingValues, final List<RouteValue> tableShardingValues) {
// 1、先routeDataSources,tableRule是表规则,databaseShardingValues是分库的数据值
Collection<String> routedDataSources = routeDataSources(tableRule, databaseShardingValues);
Collection<DataNode> result = new LinkedList<>();
// 2、再执行routeTables,根据tableRule、dataSource、tableShardingValues是分表的数据值
for (String each : routedDataSources) {
result.addAll(routeTables(tableRule, each, tableShardingValues));
}
return result;
}
private Collection<String> routeDataSources(final TableRule tableRule, final List<RouteValue> databaseShardingValues) {
if (databaseShardingValues.isEmpty()) {
return tableRule.getActualDatasourceNames();
}
// 执行DatabaseShardingStrategy的doSharding
Collection<String> result = new LinkedHashSet<>(shardingRule.getDatabaseShardingStrategy(tableRule).doSharding(tableRule.getActualDatasourceNames(), databaseShardingValues));
Preconditions.checkState(!result.isEmpty(), "no database route info");
Preconditions.checkState(tableRule.getActualDatasourceNames().containsAll(result),
"Some routed data sources do not belong to configured data sources. routed data sources: `%s`, configured data sources: `%s`", result, tableRule.getActualDatasourceNames());
return result;
}
private Collection<DataNode> routeTables(final TableRule tableRule, final String routedDataSource, final List<RouteValue> tableShardingValues) {
Collection<String> availableTargetTables = tableRule.getActualTableNames(routedDataSource);
// 执行TableShardingStrategy的doSharding
Collection<String> routedTables = new LinkedHashSet<>(tableShardingValues.isEmpty() ? availableTargetTables
: shardingRule.getTableShardingStrategy(tableRule).doSharding(availableTargetTables, tableShardingValues));
Preconditions.checkState(!routedTables.isEmpty(), "no table route info");
Collection<DataNode> result = new LinkedList<>();
for (String each : routedTables) {
result.add(new DataNode(routedDataSource, each));
}
return result;
}
private RoutingResult generateRoutingResult(final Collection<DataNode> routedDataNodes) {
RoutingResult result = new RoutingResult();
for (DataNode each : routedDataNodes) {
RoutingUnit routingUnit = new RoutingUnit(each.getDataSourceName());
routingUnit.getTableUnits().add(new TableUnit(logicTableName, each.getTableName()));
result.getRoutingUnits().add(routingUnit);
}
return result;
}
}
- StandardRoutingEngine的route的核心逻辑在route0方法当中。
- route0的核心逻辑:1、先根据database的分片值和分片规则确定database,在database确定的前提下根据table的分片值和分片规则确定table。
- 结合database和table的分片结果组装返回结果。
- routeDataSources负责实现dataSource的路由。
- routeTables负责实现table的路由。
executeUpdate过程
public final class PreparedStatementExecutor extends AbstractStatementExecutor {
public void init(final SQLRouteResult routeResult) throws SQLException {
setSqlStatementContext(routeResult.getSqlStatementContext());
// 初始化PreparedStatementExecutor的executeGroups对象
getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits()));
cacheStatements();
}
private Collection<ShardingExecuteGroup<StatementExecuteUnit>> obtainExecuteGroups(final Collection<RouteUnit> routeUnits) throws SQLException {
return getSqlExecutePrepareTemplate().getExecuteUnitGroups(routeUnits, new SQLExecutePrepareCallback() {
@Override
public List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException {
return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize);
}
@Override
public StatementExecuteUnit createStatementExecuteUnit(final Connection connection, final RouteUnit routeUnit, final ConnectionMode connectionMode) throws SQLException {
return new StatementExecuteUnit(routeUnit, createPreparedStatement(connection, routeUnit.getSqlUnit().getSql()), connectionMode);
}
});
}
public int executeUpdate() throws SQLException {
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
// 生成SQLExecuteCallback对象
SQLExecuteCallback<Integer> executeCallback = SQLExecuteCallbackFactory.getPreparedUpdateSQLExecuteCallback(getDatabaseType(), isExceptionThrown);
List<Integer> results = executeCallback(executeCallback);
if (isAccumulate()) {
return accumulate(results);
} else {
return results.get(0);
}
}
}
public abstract class AbstractStatementExecutor {
private final SQLExecuteTemplate sqlExecuteTemplate;
private final Collection<ShardingExecuteGroup<StatementExecuteUnit>> executeGroups = new LinkedList<>();
protected final <T> List<T> executeCallback(final SQLExecuteCallback<T> executeCallback) throws SQLException {
// 通过sqlExecuteTemplate的executeGroup继续执行
List<T> result = sqlExecuteTemplate.executeGroup((Collection) executeGroups, executeCallback);
refreshMetaDataIfNeeded(connection.getRuntimeContext(), sqlStatementContext);
return result;
}
}
public final class SQLExecuteCallbackFactory {
public static SQLExecuteCallback<Integer> getPreparedUpdateSQLExecuteCallback(final DatabaseType databaseType, final boolean isExceptionThrown) {
return new SQLExecuteCallback<Integer>(databaseType, isExceptionThrown) {
@Override
protected Integer executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
return ((PreparedStatement) statement).executeUpdate();
}
};
}
public static SQLExecuteCallback<Boolean> getPreparedSQLExecuteCallback(final DatabaseType databaseType, final boolean isExceptionThrown) {
return new SQLExecuteCallback<Boolean>(databaseType, isExceptionThrown) {
@Override
protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
return ((PreparedStatement) statement).execute();
}
};
}
}
- PreparedStatementExecutor通过sqlExecuteTemplate.executeGroup执行操作。
public final class SQLExecuteTemplate {
private final ShardingExecuteEngine executeEngine;
private final boolean serial;
public <T> List<T> executeGroup(final Collection<ShardingExecuteGroup<? extends StatementExecuteUnit>> sqlExecuteGroups, final SQLExecuteCallback<T> callback) throws SQLException {
return executeGroup(sqlExecuteGroups, null, callback);
}
public <T> List<T> executeGroup(final Collection<ShardingExecuteGroup<? extends StatementExecuteUnit>> sqlExecuteGroups,
final SQLExecuteCallback<T> firstCallback, final SQLExecuteCallback<T> callback) throws SQLException {
try {
return executeEngine.groupExecute((Collection) sqlExecuteGroups, firstCallback, callback, serial);
} catch (final SQLException ex) {
ExecutorExceptionHandler.handleException(ex);
return Collections.emptyList();
}
}
}
- SQLExecuteTemplate通过ShardingExecuteEngine的groupExecute继续执行。
public final class ShardingExecuteEngine implements AutoCloseable {
public <I, O> List<O> groupExecute(
final Collection<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> firstCallback, final ShardingGroupExecuteCallback<I, O> callback, final boolean serial)
throws SQLException {
if (inputGroups.isEmpty()) {
return Collections.emptyList();
}
return serial ? serialExecute(inputGroups, firstCallback, callback) : parallelExecute(inputGroups, firstCallback, callback);
}
private <I, O> List<O> serialExecute(final Collection<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> firstCallback,
final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
Iterator<ShardingExecuteGroup<I>> inputGroupsIterator = inputGroups.iterator();
ShardingExecuteGroup<I> firstInputs = inputGroupsIterator.next();
List<O> result = new LinkedList<>(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback));
for (ShardingExecuteGroup<I> each : Lists.newArrayList(inputGroupsIterator)) {
result.addAll(syncGroupExecute(each, callback));
}
return result;
}
private <I, O> Collection<O> syncGroupExecute(final ShardingExecuteGroup<I> executeGroup, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
return callback.execute(executeGroup.getInputs(), true, ShardingExecuteDataMap.getDataMap());
}
private <I, O> List<O> parallelExecute(final Collection<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> firstCallback,
final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
Iterator<ShardingExecuteGroup<I>> inputGroupsIterator = inputGroups.iterator();
ShardingExecuteGroup<I> firstInputs = inputGroupsIterator.next();
Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncGroupExecute(Lists.newArrayList(inputGroupsIterator), callback);
return getGroupResults(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures);
}
private <I, O> Collection<ListenableFuture<Collection<O>>> asyncGroupExecute(final List<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> callback) {
Collection<ListenableFuture<Collection<O>>> result = new LinkedList<>();
for (ShardingExecuteGroup<I> each : inputGroups) {
result.add(asyncGroupExecute(each, callback));
}
return result;
}
private <I, O> ListenableFuture<Collection<O>> asyncGroupExecute(final ShardingExecuteGroup<I> inputGroup, final ShardingGroupExecuteCallback<I, O> callback) {
final Map<String, Object> dataMap = ShardingExecuteDataMap.getDataMap();
return executorService.submit(new Callable<Collection<O>>() {
@Override
public Collection<O> call() throws SQLException {
return callback.execute(inputGroup.getInputs(), false, dataMap);
}
});
}
}
分库分表中间结果