三、MyCat 客户端 SQL 请求执行流程

  • RW#run 轮询注册队列中是否有 AbstractConnection,若存在且为读事件则调用 AbstractConnection#asynRead 异步读取数据,实际处理逻辑见 NIOSocketWR#asynRead

  • NIOSocketWR#asynRead 从 前端连接的 channel 中读取数据,并且保存到对应 AbstractConnectionreadBuffer 中,之后调用 AbstractConnection#onReadData 处理读取到的数据

    @Override
    public void asynRead() throws IOException {
       ByteBuffer theBuffer = con.readBuffer;
       if (theBuffer == null) {
          theBuffer = con.processor.getBufferPool().allocate(con.processor.getBufferPool().getChunkSize());
          con.readBuffer = theBuffer;
       }
       // 从 SocketChannel 中读取数据,并且保存到 AbstractConnection 的 readBuffer 中,readBuffer 处于 write mode,返回读取了多少字节
       int got = channel.read(theBuffer);
       // 调用处理读取到的数据的方法
       con.onReadData(got);
    }
    
  • AbstractConnection#onReadData 读取 readBuffer 中的数据并调用 AbstractConnection#handle 方法进行下一步处理,其内部调用 FrontendCommandHandler#handle

  • FrontendCommandHandler#handle 根据 data[4] 来判断命令类型,客户端命令请求报文格式如下图:

data 的第五个字节存储命令类型,客户端命令请求报文命令类型详情表见附录1。我们以 MySQLPacket.COM_QUERY 为例进行接下来的讨论。当 data[4] == MySQLPacket.COM_QUERY 时,调用 FrontendConnection#query(byte[])

public void handle(byte[] data) {
    // 判断命令类型
    switch (data[4]) {
        ...
        // INSERT/SELECT/UPDATE/DELETE 等 SQL 归属于 MySQLPacket.COM_QUERY
        case MySQLPacket.COM_QUERY:
            commands.doQuery();
            source.query(data);
            break;
      ...
    }
}
  • FrontendConnection#query(byte[])data 字节数组转化成 String 类型的 SQL, ServerQueryHandler#query(String) 方法

    public void query(byte[] data) {  
        MySQLMessage mm = new MySQLMessage(data);
        // 从 data[5] 即第六个字节开始读取参数体
        mm.position(5);
        String sql = mm.readString(charset);
        // 执行 sql 语句,内部调用 ServerQueryHandler#query(String)
        this.query( sql );
    }
    
  • ServerQueryHandler#query(String) 解析 SQL 类型,根据 sqlType 使用不同的 Handler 做处理

    @Override
    public void query(String sql) {
        ServerConnection c = this.source;
        /* 解析 SQL 类型 */
        int rs = ServerParse.parse(sql);
        int sqlType = rs & 0xff;
        
        switch (sqlType) {
            // explain2 datanode=? sql=?
            case ServerParse.EXPLAIN2:
                Explain2Handler.handle(sql, c, rs >>> 8);
                break;
            case ServerParse.SELECT:
                SelectHandler.handle(sql, c, rs >>> 8);
                break;
            case ...
            default:
                if (readOnly) {
                    LOGGER.warn(new StringBuilder().append("User readonly:").append(sql).toString());
                    c.writeErrMessage(ErrorCode.ER_USER_READ_ONLY, "User readonly");
                    break;
                }
                c.execute(sql, rs & 0xff);
        }
    }
    
  • 例如 sqlType == ServerParse.SELECT 时使用 SelectHandler 做进一步处理

    public static void handle(String stmt, ServerConnection c, int offs) {
        int offset = offs;
        c.setExecuteSql(null);
        switch (ServerParseSelect.parse(stmt, offs)) {
        case ServerParseSelect.DATABASE:
            SelectDatabase.response(c);
            break;
        case ServerParseSelect.USER:
            SelectUser.response(c);
            break;
        case ...
        default:
            c.setExecuteSql(stmt);
            c.execute(stmt, ServerParse.SELECT);
        }
    }
    
  • SelectHandler 进一步解析 select 语句,针对不同的 select 进行不同的处理,默认直接调用 ServerConnection#execute(java.lang.String, int) ,该方法首先进行一些常规检查(连接状态检查、事务状态检查、当前 DB 检查等),然后调用 ServerConnection#routeEndExecuteSQL 进行路由计算(包括全局序列号、SQL 语句拦截等。路由计算详细另述)并得到路由结果 RouteResultset,之后调用 NonBlockingSession#execute

    public void routeEndExecuteSQL(String sql, final int type, final SchemaConfig schema) {
        // 路由计算
        RouteResultset rrs = MycatServer
                    .getInstance()
                    .getRouterservice()
                    .route(MycatServer.getInstance().getConfig().getSystem(), schema, type, sql, this.charset, this);
        if (rrs != null) {
            // session 执行
            session.execute(rrs, rrs.isSelectForUpdate() ? ServerParse.UPDATE : type);
        }
    }
    
  • NonBlockingSession#execute 获取路由的 dataNode 节点,若节点数为 1 则调用 SingleNodeHandler#execute 处理 sql,否则调用 MultiNodeQueryHandler#execute 处理 SQL。此处我们假定前端 SQL 命令只路由到一个 dataNode,则调用 SingleNodeHandler#execute 处理 SQL

    /**
     * NonBlockingSession#execute
     */
    @Override
    public void execute(RouteResultset rrs, int type) {
        RouteResultsetNode[] nodes = rrs.getNodes();
        if (nodes.length == 1) {
            singleNodeHandler = new SingleNodeHandler(rrs, this);
            singleNodeHandler.execute();
        } else {
            multiNodeHandler = new MultiNodeQueryHandler(type, rrs, autocommit, this);
            multiNodeHandler.execute();
        }
    }
    
  • SingleNodeHandler#execute 获取后端连接 BackendConnection,并调用 SingleNodeHandler#_execute,该方法直接调用 BackendConnection#execute

    public void execute() throws Exception {
        // 获取后端数据库连接
        final BackendConnection conn = session.getTarget(node);
        // 若存在 dataNode 对应的 BackendConnection
        if (session.tryExistsCon(conn, node)) {
            _execute(conn);
        } else {
            // create new connection
              do something...
        }
    }
    
    private void _execute(BackendConnection conn) {
        conn.execute(node, session.getSource(), session.getSource().isAutocommit());
    }
    
  • 当 schema.xml 中配置 <dataHost>dbDriver=='jdbc' 时,调用 JDBCConnection#execute 处理 SQL( JDBCConnection 继承 BackendConnection)。该方法新开一个线程处理 SQL,最终调用 JDBCConnection#ouputResultSet 执行 SQL 并将结果写入 ServerConnection

    // JDBCConnection.class
    @Override
    public void execute(final RouteResultsetNode node, final ServerConnection source, final boolean autocommit) {
        this.sqlSelectLimit = source.getSqlSelectLimit();
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                // 调用 JDBCConnection#ouputResultSet
                executeSQL(node, source, autocommit);
            }
        };
        MycatServer.getInstance().getBusinessExecutor().execute(runnable);
    }
    
  • JDBCConnection#ouputResultSet 获取数据库连接并执行 SQL,然后将得到的结果集 ResultSet 解析为 ResultSet 响应报文并写入 ServerConnection

    private void ouputResultSet(ServerConnection sc, String sql) throws SQLException {
        ResultSet rs = null;
        Statement stmt = null;
        
        try {
            stmt = con.createStatement();
            rs = stmt.executeQuery(sql);
    
            List<FieldPacket> fieldPks = new LinkedList<FieldPacket>();
            // 根据 resultset 加载列信息,保存至 fieldPks
            ResultSetUtil.resultSetToFieldPacket(sc.getCharset(), fieldPks, rs,
                    this.isSpark);
            // 获取列数
            int colunmCount = fieldPks.size();
            ByteBuffer byteBuf = sc.allocate();
    
            /* 1 写入 resultset header packet */
            ResultSetHeaderPacket headerPkg = new ResultSetHeaderPacket();
            headerPkg.fieldCount = fieldPks.size();
            headerPkg.packetId = ++packetId;
            // 将 ResultSetHeaderPacket 的数据写入 byteBuf
            byteBuf = headerPkg.write(byteBuf, sc, true);
            byteBuf.flip();
            byte[] header = new byte[byteBuf.limit()];
            // 将 byteBuf 中的信息写入 header 中
            byteBuf.get(header);
            // byteBuf 标记归位
            byteBuf.clear();
    
            /* 2 写入 field packet */
            List<byte[]> fields = new ArrayList<byte[]>(fieldPks.size());
            Iterator<FieldPacket> itor = fieldPks.iterator();
            while (itor.hasNext()) {
                FieldPacket curField = itor.next();
                curField.packetId = ++packetId;
                // 将 FieldPacket 的数据写入 byteBuf
                byteBuf = curField.write(byteBuf, sc, false);
                // position 设回 0,并将 limit 设成之前的 position 的值
                // limit:缓冲区数组中不可操作的下一个元素的位置:limit<=capacity
                byteBuf.flip();
                byte[] field = new byte[byteBuf.limit()];
                // 将 byteBuf 中的信息写入 field 中
                byteBuf.get(field);
                byteBuf.clear();
                // 将 field 放入 fields
                fields.add(field);
            }
            /* 3 写入 EOF packet */
            EOFPacket eofPckg = new EOFPacket();
            eofPckg.packetId = ++packetId;
            // 将 EOFPacket 的数据写入 byteBuf
            byteBuf = eofPckg.write(byteBuf, sc, false);
            byteBuf.flip();
            byte[] eof = new byte[byteBuf.limit()];
            // 将 byteBuf 中的信息写入 eof 中
            byteBuf.get(eof);
            byteBuf.clear();
            this.respHandler.fieldEofResponse(header, fields, eof, this);
    
            /* 4 写入 Row Data packet */
            // output row
            while (rs.next()) {
                ResultSetMetaData resultSetMetaData = rs.getMetaData();
                int size = resultSetMetaData.getColumnCount();
                StringBuilder builder = new StringBuilder();
                for (int i = 1; i <= size; i++) {
                    builder.append(resultSetMetaData.getColumnName(i) + "=" + rs.getString(i));
                    if (i < size) {
                        builder.append(", ");
                    }
                }
                LOGGER.debug("JDBCConnection.ouputResultSet sql: {}, resultSet: {}", sql, builder.toString());
                RowDataPacket curRow = new RowDataPacket(colunmCount);
                for (int i = 0; i < colunmCount; i++) {
                    int j = i + 1;
                    if (MysqlDefs.isBianry((byte) fieldPks.get(i).type)) {
                        curRow.add(rs.getBytes(j));
                    } else if (fieldPks.get(i).type == MysqlDefs.FIELD_TYPE_DECIMAL ||
                            fieldPks.get(i).type == (MysqlDefs.FIELD_TYPE_NEW_DECIMAL - 256)) { // field type is unsigned byte
                        // ensure that do not use scientific notation format
                        BigDecimal val = rs.getBigDecimal(j);
                        curRow.add(StringUtil.encode(val != null ? val.toPlainString() : null,
                                sc.getCharset()));
                    } else {
                        curRow.add(StringUtil.encode(rs.getString(j),
                                sc.getCharset()));
                    }
    
                }
                curRow.packetId = ++packetId;
                // 将 RowDataPacket 的数据写入 byteBuf
                byteBuf = curRow.write(byteBuf, sc, false);
                byteBuf.flip();
                byte[] row = new byte[byteBuf.limit()];
                byteBuf.get(row);
                byteBuf.clear();
                this.respHandler.rowResponse(row, this);
            }
    
            fieldPks.clear();
    
            // end row
            /* 5 写入 EOF packet */
            eofPckg = new EOFPacket();
            eofPckg.packetId = ++packetId;
            byteBuf = eofPckg.write(byteBuf, sc, false);
            byteBuf.flip();
            eof = new byte[byteBuf.limit()];
            byteBuf.get(eof);
            sc.recycle(byteBuf);
            this.respHandler.rowEofResponse(eof, this);
        } finally {
            if (rs != null) {
                try {
                    rs.close();
                } catch (SQLException e) {
    
                }
            }
            if (stmt != null) {
                try {
                    stmt.close();
                } catch (SQLException e) {
    
                }
            }
        }
    }
    
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,076评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,658评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,732评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,493评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,591评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,598评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,601评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,348评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,797评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,114评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,278评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,953评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,585评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,202评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,180评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,139评论 2 352