数据库中间件 Sharding-JDBC 源码分析 —— SQL 执行

1. 概述

越过千山万水(SQL 解析、SQL 路由、SQL 改写),我们终于来到了 SQL 执行。开森不开森?!

查询语句的程序入口为ShardingPreparedStatement#execute

public boolean execute() throws SQLException {
        try {
            // 路由(包括了 SQL 解析、SQL 路由、SQL 改写)
            Collection<PreparedStatementUnit> preparedStatementUnits = route();
            // SQL 执行
            return new PreparedStatementExecutor(
                    getConnection().getShardingContext().getExecutorEngine(), routeResult.getSqlStatement().getType(), preparedStatementUnits, getParameters()).execute();
        } finally {
            clearBatch();
        }
    }

前面的文章已经讲了 SQL 解析、SQL 路由、SQL 改写,本文继续探讨 SQL 执行。

注意:之所以未采用常用的executeQuery,是因为它只支持返回一个结果集ResultSet,不符合分片的场景。

2. ExecutorEngine

ExecutorEngine,SQL执行引擎。

分表分库,需要执行的 SQL 数量从单条变成了多条,此时有两种方式执行:

  • 串行执行 SQL
  • 并行执行 SQL

前者,编码容易,性能较差,总耗时是多条 SQL 执行时间累加。
后者,编码复杂,性能较好,总耗时约等于执行时间最长的 SQL。

ExecutorEngine 当然采用的是后者,并行执行 SQL。

2.1 ListeningExecutorService

Guava( Java 工具库 ) 提供的继承自 ExecutorService 的线程服务接口,提供创建 ListenableFuture 功能。ListenableFuture 接口,继承 Future 接口,有如下好处:

我们强烈地建议你在代码中多使用 ListenableFuture 来代替 JDK 的 Future, 因为:
1. 大多数 Futures 方法中需要它。
2. 转到 ListenableFuture 编程比较容易。
3. Guava 提供的通用公共类封装了公共的操作方方法,不需要提供 Future 和 ListenableFuture 的扩展方法。

传统 JDK中 的 Future 通过异步的方式计算返回结果:在多线程运算中可能在没有结束就返回结果。

ListenableFuture 可以允许你注册回调方法(callbacks),在运算(多线程执行)完成的时候进行调用。这样简单的改进,使得可以明显的支持更多的操作,这样的功能在 JDK concurrent 中的 Future 是不支持的。

下文我们看 Sharding-JDBC 是如何通过 ListenableFuture 简化并发编程的。

先看 ExecutorEngine 如何初始化 ListeningExecutorService:

public final class ExecutorEngine implements AutoCloseable {
    
    private final ListeningExecutorService executorService;
    
    public ExecutorEngine(final int executorSize) {
        executorService = MoreExecutors.listeningDecorator(new ThreadPoolExecutor(
                executorSize, executorSize, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ShardingJDBC-%d").build()));
        MoreExecutors.addDelayedShutdownHook(executorService, 60, TimeUnit.SECONDS);
    }
    ...
}
  • 一个分片数据源( ShardingDataSource ) 独占 一个 SQL执行引擎( ExecutorEngine )。
  • MoreExecutors#listeningDecorator()创建 ListeningExecutorService,这样 #submit(), #invokeAll() 可以返回 ListenableFuture。
  • 默认情况下,线程池大小为 8。可以根据实际业务需要,设置 ShardingProperties 进行调整。
  • setNameFormat()并发编程时,一定要对线程名字做下定义,这样排查问题会方便很多。
  • MoreExecutors#addDelayedShutdownHook(),应用关闭时,等待所有任务全部完成再关闭。默认配置等待时间为 60 秒,建议将等待时间做成可配的。

2.2 关闭

数据源关闭时,会调用 ExecutorEngine 也进行关闭。

    // ShardingDataSource.java
    @Override
    public void close() {
       executorEngine.close();
    }

    // ExecutorEngine
    @Override
    public void close() {
       executorService.shutdownNow();
       try {
           executorService.awaitTermination(5, TimeUnit.SECONDS);
       } catch (final InterruptedException ignored) {
       }
       if (!executorService.isTerminated()) {
           throw new ShardingJdbcException("ExecutorEngine can not been terminated");
       }
    }
  • shutdownNow() 尝试使用 Thread.interrupt() 打断正在执行中的任务,未执行的任务不再执行。
  • awaitTermination() 因为 #shutdownNow() 打断不是立即结束,需要一个过程,因此这里等待了 5 秒。
  • 等待 5 秒后,线程池不一定已经关闭,此时抛出异常给上层。建议打印下日志,记录出现这个情况。

2.3 执行 SQL 任务

ExecutorEngine 对外暴露executeStatement()executePreparedStatement()executeBatch()三个方法分别提供给 StatementExecutor、PreparedStatementExecutor、BatchPreparedStatementExecutor 调用。而这三个方法,内部调用的都是execute()私有方法。

// ExecutorEngine.java
private  <T> List<T> execute(
            final SQLType sqlType, final Collection<? extends BaseStatementUnit> baseStatementUnits, 
            final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) throws SQLException {
        if (baseStatementUnits.isEmpty()) {
            return Collections.emptyList();
        }
        OverallExecutionEvent event = new OverallExecutionEvent(sqlType, baseStatementUnits.size());
        // 发布执行之前事件
        EventBusInstance.getInstance().post(event);
        Iterator<? extends BaseStatementUnit> iterator = baseStatementUnits.iterator();
        BaseStatementUnit firstInput = iterator.next();
        // 第二个任务开始所有 SQL任务 提交线程池【异步】执行任务
        ListenableFuture<List<T>> restFutures = asyncExecute(sqlType, Lists.newArrayList(iterator), parameterSets, executeCallback);
        T firstOutput;
        List<T> restOutputs;
        try {
            // 第一个任务【同步】执行任务
            firstOutput = syncExecute(sqlType, firstInput, parameterSets, executeCallback);
            // 等待第二个任务开始所有 SQL任务完成
            restOutputs = restFutures.get();
            //CHECKSTYLE:OFF
        } catch (final Exception ex) {
            //CHECKSTYLE:ON
            event.setException(ex);
            event.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);
            // 发布执行失败事件
            EventBusInstance.getInstance().post(event);
            ExecutorExceptionHandler.handleException(ex);
            return null;
        }
        event.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);
        // 发布执行成功事件
        EventBusInstance.getInstance().post(event);
        // 返回结果
        List<T> result = Lists.newLinkedList(restOutputs);
        result.add(0, firstOutput);
        return result;
    }

第一个任务【同步】调用executeInternal()执行任务。

    private <T> T syncExecute(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) throws Exception {
       // 【同步】执行任务
       return executeInternal(sqlType, baseStatementUnit, parameterSets, executeCallback, ExecutorExceptionHandler.isExceptionThrown(), ExecutorDataMap.getDataMap());
    }

第二个开始的任务提交线程池异步调用executeInternal()执行任务。

private <T> ListenableFuture<List<T>> asyncExecute(
            final SQLType sqlType, final Collection<BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {
        List<ListenableFuture<T>> result = new ArrayList<>(baseStatementUnits.size());
        final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
        final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
        for (final BaseStatementUnit each : baseStatementUnits) {
            // 提交线程池【异步】执行任务
            result.add(executorService.submit(new Callable<T>() {
                
                @Override
                public T call() throws Exception {
                    return executeInternal(sqlType, each, parameterSets, executeCallback, isExceptionThrown, dataMap);
                }
            }));
        }
        // 返回 ListenableFuture
        return Futures.allAsList(result);
    }

我们注意下Futures.allAsList(result)restOutputs=restFutures.get()。神器 Guava 简化并发编程的好处就提现出来了。 ListenableFuture#get()当所有任务都成功时,返回所有任务执行结果;当任何一个任务失败时,马上抛出异常,无需等待其他任务执行完成。

为什么会分同步执行和异步执行呢?猜测,当 SQL 执行是单表时,只要进行第一个任务的同步调用,性能更加优秀。

// ExecutorEngine.java
private <T> T executeInternal(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback, 
                          final boolean isExceptionThrown, final Map<String, Object> dataMap) throws Exception {
        synchronized (baseStatementUnit.getStatement().getConnection()) {
            T result;
            ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
            ExecutorDataMap.setDataMap(dataMap);
            List<AbstractExecutionEvent> events = new LinkedList<>();
            if (parameterSets.isEmpty()) {
                // 生成 Event
                events.add(getExecutionEvent(sqlType, baseStatementUnit, Collections.emptyList()));
            }
            for (List<Object> each : parameterSets) {
                events.add(getExecutionEvent(sqlType, baseStatementUnit, each));
            }
            // EventBus 发布 EventExecutionType.BEFORE_EXECUTE
            for (AbstractExecutionEvent event : events) {
                EventBusInstance.getInstance().post(event);
            }
            try {
                // 执行回调函数
                result = executeCallback.execute(baseStatementUnit);
            } catch (final SQLException ex) {
                // EventBus 发布 EventExecutionType.EXECUTE_FAILURE
                for (AbstractExecutionEvent each : events) {
                    each.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);
                    each.setException(ex);
                    EventBusInstance.getInstance().post(each);
                    ExecutorExceptionHandler.handleException(ex);
                }
                return null;
            }
            // EventBus 发布 EventExecutionType.EXECUTE_SUCCESS
            for (AbstractExecutionEvent each : events) {
                each.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);
                EventBusInstance.getInstance().post(each);
            }
            return result;
        }
    }

result=executeCallback.execute(baseStatementUnit) 执行回调函数。StatementExecutor,PreparedStatementExecutor,BatchPreparedStatementExecutor 通过传递执行回调函数( ExecuteCallback )实现给 ExecutorEngine 实现并行执行。

    public interface ExecuteCallback<T> {

        /**
         * 执行任务.
         * 
         * @param baseStatementUnit 语句对象执行单元
         * @return 处理结果
         * @throws Exception 执行期异常
         */
        T execute(BaseStatementUnit baseStatementUnit) throws Exception;
    }

synchronized(baseStatementUnit.getStatement().getConnection()),这里加锁的原因是,虽然 MySQL、Oracle 的 Connection 实现是线程安全的。但是数据库连接池实现的 Connection 不一定是线程安全,例如 Druid 的线程池 Connection 非线程安全。

3. Executor

Executor,执行器,目前一共有三个执行器。不同的执行器对应不同的执行单元 (BaseStatementUnit)。

执行器类 执行器名 执行单元
StatementExecutor 静态语句对象执行单元 StatementUnit
PreparedStatementExecutor 预编译语句对象请求的执行器 PreparedStatementUnit
BatchPreparedStatementExecutor 批量预编译语句对象请求的执行器 BatchPreparedStatementUnit

3.1 StatementExecutor

StatementExecutor,多线程执行静态语句对象请求的执行器,一共有三类方法:

  • executeQuery() 执行 SQL 查询
public List<ResultSet> executeQuery() throws SQLException {
        return executorEngine.executeStatement(sqlType, statementUnits, new ExecuteCallback<ResultSet>() {
            
            @Override
            public ResultSet execute(final BaseStatementUnit baseStatementUnit) throws Exception {
                return baseStatementUnit.getStatement().executeQuery(baseStatementUnit.getSqlExecutionUnit().getSql());
            }
        });
    }
  • executeUpdate() 执行 SQL 更新
public int executeUpdate() throws SQLException {
        return executeUpdate(new Updater() {
            
            @Override
            public int executeUpdate(final Statement statement, final String sql) throws SQLException {
                return statement.executeUpdate(sql);
            }
        });
    }
  • execute() 执行 SQL
public boolean execute() throws SQLException {
        return execute(new Executor() {
            
            @Override
            public boolean execute(final Statement statement, final String sql) throws SQLException {
                return statement.execute(sql);
            }
        });
    }

3.2 PreparedStatementExecutor

PreparedStatementExecutor,多线程执行预编译语句对象请求的执行器。比 StatementExecutor 多了parameters参数,方法逻辑上基本一致,就不重复分享啦。

3.3 BatchPreparedStatementExecutor

BatchPreparedStatementExecutor,多线程执行批量预编译语句对象请求的执行器。

// BatchPreparedStatementExecutor.java
public final class BatchPreparedStatementExecutor {
    
    private final ExecutorEngine executorEngine;
    
    private final DatabaseType dbType;
    
    private final SQLType sqlType;
    
    private final Collection<BatchPreparedStatementUnit> batchPreparedStatementUnits;
    
    private final List<List<Object>> parameterSets;
    
    /**
     * Execute batch.
     * 
     * @return execute results
     * @throws SQLException SQL exception
     */
    public int[] executeBatch() throws SQLException {
        return accumulate(executorEngine.executeBatch(sqlType, batchPreparedStatementUnits, parameterSets, new ExecuteCallback<int[]>() {
            
            @Override
            public int[] execute(final BaseStatementUnit baseStatementUnit) throws Exception {
                return baseStatementUnit.getStatement().executeBatch();
            }
        }));
    }
    
    // 计算每个语句的更新数量
    private int[] accumulate(final List<int[]> results) {
        int[] result = new int[parameterSets.size()];
        int count = 0;
        // 每个语句按照顺序,读取到其对应的每个分片SQL影响的行数进行累加
        for (BatchPreparedStatementUnit each : batchPreparedStatementUnits) {
            for (Map.Entry<Integer, Integer> entry : each.getJdbcAndActualAddBatchCallTimesMap().entrySet()) {
                int value = null == results.get(count) ? 0 : results.get(count)[entry.getValue()];
                if (DatabaseType.Oracle == dbType) {
                    result[entry.getKey()] = value;
                } else {
                    result[entry.getKey()] += value;
                }
            }
            count++;
        }
        return result;
    }
}

眼尖的同学会发现,为什么有 BatchPreparedStatementExecutor,而没有 BatchStatementExecutor 呢?目前 Sharding-JDBC 不支持 Statement 批量操作,只能进行 PreparedStatement 的批操作。

4. ExecutionEvent

AbstractExecutionEvent,SQL 执行事件抽象接口。

public abstract class AbstractExecutionEvent {
    // 事件编号
    @Getter
    private final String id = UUID.randomUUID().toString();
    // 事件类型
    @Getter
    @Setter
    private EventExecutionType eventExecutionType = EventExecutionType.BEFORE_EXECUTE;
    
    @Setter
    private Exception exception;
    
    public Optional<? extends Exception> getException() {
        return Optional.fromNullable(exception);
    }
}

AbstractExecutionEvent 的子类关系图为:


  • DMLExecutionEvent:DML类 SQL 执行时事件
  • DQLExecutionEvent:DQL类 SQL 执行时事件

EventExecutionType,事件触发类型。

  • BEFORE_EXECUTE:执行前
  • EXECUTE_SUCCESS:执行成功
  • EXECUTE_FAILURE:执行失败

4.1 EventBus

那究竟有什么用途呢? Sharding-JDBC 使用 Guava(没错,又是它)的 EventBus 实现了事件的发布和订阅。从上文 ExecutorEngine#executeInternal()我们可以看到每个分片 SQL 执行的过程中会发布相应事件:

  • 执行 SQL 前:发布类型类型为 BEFORE_EXECUTE 的事件
  • 执行 SQL 成功:发布类型类型为 EXECUTE_SUCCESS 的事件
  • 执行 SQL 失败:发布类型类型为 EXECUTE_FAILURE 的事件

怎么订阅事件呢(目前 Sharding-JDBC 是没有订阅这些事件的,只是提供了事件发布订阅的功能而已)?非常简单,例子如下:

EventBusInstance.getInstance().register(new Runnable() {
      @Override
      public void run() {
      }

      @Subscribe // 订阅
      @AllowConcurrentEvents // 是否允许并发执行,即线程安全
      public void listen(final DMLExecutionEvent event) { // DMLExecutionEvent
          System.out.println("DMLExecutionEvent:" + event.getSql() + "\t" + event.getEventExecutionType());
      }

      @Subscribe // 订阅
      @AllowConcurrentEvents // 是否允许并发执行,即线程安全
      public void listen2(final DQLExecutionEvent event) { //DQLExecutionEvent
          System.out.println("DQLExecutionEvent:" + event.getSql() + "\t" + event.getEventExecutionType());
      }
    });
  • register() 任何类都可以,并非一定需要使用 Runnable 类。此处例子单纯因为方便
  • @Subscribe 注解在方法上,实现对事件的订阅
  • @AllowConcurrentEvents 注解在方法上,表示线程安全,允许并发执行
  • 方法上的参数对应的类即是订阅的事件。例如, #listen() 订阅了 DMLExecutionEvent 事件
  • EventBus#post() 发布事件,同步调用订阅逻辑

5. 结语

SQL 执行完毕之后,执行结果封装在ResultSet对象中,如:

Statement stmt =con.createStatement( ResultSet.TYPE_SCROLL_INSENSITIVE,ResultSet.CONCUR_UPDATABLE);
ResultSet rs = stmt.executeQuery("SELECT a, b FROM TABLE2");

多个 SQL 执行结果就会有多个ResultSet,必然需要进行合并。下一篇文章我们将探讨 SQL 结果归并,敬请关注~

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,390评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,821评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,632评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,170评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,033评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,098评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,511评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,204评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,479评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,572评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,341评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,893评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,171评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,486评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,676评论 2 335