NoSQL 根本性的优势在于在云计算时代,简单、易于大规模分布式扩展,并且读写性能非常高
关系型数据库 | NoSQL 数据库 | |
---|---|---|
特点 | -数据关系模型基于关系模型,结构化存储,完整性约束 -基于二维表及其之间的联系,需要连接、并、交、差、除等数据操作 -采用结构化的查询语言(SQL)做数据读写 -操作需要数据的一致性,需要事务甚至是强一致性 |
- 非结构化的存储 - 基于多维关系模型 - 具有特有的使用场景 |
优点 | - 保持数据的一致性(事务处理) - 可以进行 join 等复杂查询 - 通用化,技术成熟 |
- 高并发,大数据下读写能力较强 - 基本支持分布式,易于扩展,可伸缩 - 简单,弱结构化存储 |
缺点 | - 数据读写必须经过 sql 解析,大量数据、高并发下读写性能不足 - 对数据做读写,或修改数据结构时需要加锁,影响并发操作 - 无法适应非结构化存储 - 扩展困难 - 昂贵、复杂 |
- join 等复杂操作能力较弱 - 事务支持较弱 - 通用性差 - 无完整约束复杂业务场景支持较差 |
数据的切分(Sharding)根据其切分规则的类型,可以分为两种切分模式。一种是按照不同的表(或者Schema)来切分到不同的数据库(主机)之上,这种切可以称之为数据的垂直(纵向)切分;另外一种则是根据表中的数据的逻辑关系,将同一个表中的数据按照某种条件拆分到多台数据库(主机)上面,这种切分称之为数据的水平(横向)切分
对于 DBA 来说,可以这么理解 Mycat:
Mycat 就是 MySQL Server,而 Mycat 后面连接的 MySQL Server,就好象是 MySQL 的存储引擎,如 InnoDB,MyISAM 等,因此,Mycat 本身并不存储数据,数据是在后端的 MySQL 上存储的,因此数据可靠性以及事务等都是 MySQL 保证的,简单的说,Mycat 就是 MySQL 最佳伴侣,它在一定程度上让 MySQL 拥有了能跟 Oracle PK 的能力。
对于软件工程师来说,可以这么理解 Mycat:
Mycat 就是一个近似等于 MySQL 的数据库服务器,你可以用连接 MySQL 的方式去连接 Mycat(除了端口不同,默认的 Mycat 端口是 8066 而非 MySQL 的 3306,因此需要在连接字符串上增加端口信息),大多数情况下,可以用你熟悉的对象映射框架使用 Mycat,但建议对于分片表,尽量使用基础的 SQL 语句,因为这样能达到最佳性能,特别是几千万甚至几百亿条记录的情况下。
对于架构师来说,可以这么理解 Mycat:
Mycat 是一个强大的数据库中间件,不仅仅可以用作读写分离、以及分表分库、容灾备份,而且可以用于多租户应用开发、云平台基础设施、让你的架构具备很强的适应性和灵活性,借助于即将发布的 Mycat 智能优化模块,系统的数据访问瓶颈和热点一目了然,根据这些统计分析数据,你可以自动或手工调整后端存储,将不同的表映射到不同存储引擎上,而整个应用的代码一行也不用改变。
Mycat 原理
Mycat 的原理中最重要的一个动词是“拦截”,它拦截了用户发送过来的 SQL 语句,首先对 SQL 语句做了一些特定的分析:如分片分析、路由分析、读写分离分析、缓存分析等,然后将此 SQL 发往后端的真实数据库,并将返回的结果做适当的处理,最终再返回给用户
上述图片里,Orders 表被分为三个分片 datanode(简称 dn),这三个分片是分布在两台 MySQL Server 上(DataHost),即 datanode=database@datahost 方式,因此你可以用一台到 N 台服务器来分片,分片规则为(sharding rule)典型的字符串枚举分片规则,一个规则的定义是分片字段(sharding column)+分片函数(rule function),这里的分片字段为 prov 而分片函数为字符串枚举方式
当 Mycat 收到一个 SQL 时,会先解析这个 SQL,查找涉及到的表,然后看此表的定义,如果有分片规则,则获取到 SQL 里分片字段的值,并匹配分片函数,得到该 SQL 对应的分片列表,然后将 SQL 发往这些分片去执行,最后收集和处理所有分片返回的结果数据,并输出到客户端。以 select * from Orders where prov=?语句为例,查到 prov=wuhan,按照分片函数,wuhan 返回 dn1,于是 SQL 就发给了 MySQL1,去取 DB1 上的查询结果,并返回给用户。
如果上述 SQL 改为 select * from Orders where prov in (‘wuhan’,‘beijing’),那么,SQL 就会发给MySQL1 与 MySQL2 去执行,然后结果集合并后输出给用户。但通常业务中我们的 SQL 会有 Order By 以及Limit 翻页语法,此时就涉及到结果集在 Mycat 端的二次处理,这部分的代码也比较复杂,而最复杂的则属两个表的 Jion 问题,为此,Mycat 提出了创新性的 ER 分片、全局表、HBT(Human Brain Tech)人工智能的 Catlet、以及结合 Storm/Spark 引擎等十八般武艺的解决办法,从而成为目前业界最强大的方案,这就是开源的力量
应用场景
- 单纯的读写分离,此时配置最为简单,支持读写分离,主从切换;
- 分表分库,对于超过 1000 万的表进行分片,最大支持 1000 亿的单表分片;
- 多租户应用,每个应用一个库,但应用程序只连接 Mycat,从而不改造程序本身,实现多租户化;
- 报表系统,借助于 Mycat 的分表能力,处理大规模报表的统计;
- 替代 Hbase,分析大数据;
- 作为海量数据实时查询的一种简单有效方案,比如 100 亿条频繁查询的记录需要在 3 秒内查询出来结果,除了基于主键的查询,还可能存在范围查询或其他属性查询,此时 Mycat 可能是最简单有效的选择
Mycat 中的概念
数据库中间件
Mycat 是数据库中间件,就是介于数据库与应用之间,进行数据处理与交互的中间服务。由于前面讲的对数据进行分片处理之后,从原有的一个库,被切分为多个分片数据库,所有的分片数据库集群构成了整个完整的数据库存储
逻辑库(schema)
通常对实际应用来说,并不需要知道中间件的存在,业务开发人员只需要知道数据库的概念,所以数据库中间件可以被看做是一个或多个数据库集群构成的逻辑库
逻辑表(table)
ER 表
关系型数据库是基于实体关系模型(Entity-Relationship Model)之上,通过其描述了真实世界中事物与关系,Mycat 中的 ER 表即是来源于此。根据这一思路,提出了基于 E-R 关系的数据分片策略,子表的记录与所关联的父表记录存放在同一个数据分片上,即子表依赖于父表,通过表分组(Table Group)保证数据 Join 不会跨库操作
全局表
一个真实的业务系统中,往往存在大量的类似字典表的表,这些表基本上很少变动,字典表具有以下几个特性:
• 变动不频繁;
• 数据量总体变化不大;
• 数据规模不大,很少有超过数十万条记录。
对于这类的表,在分片的情况下,当业务表因为规模而进行分片以后,业务表与这些附属的字典表之间的关联,就成了比较棘手的问题,所以 Mycat 中通过数据冗余来解决这类表的 join,即所有的分片都有一份数据的拷贝,所有将字典表或者符合字典表特性的一些表定义为全局表。数据冗余是解决跨分片数据 join 的一种很好的思路,也是数据切分规划的另外一条重要规则
分片节点(dataNode)
数据切分后,一个大表被分到不同的分片数据库上面,每个表分片所在的数据库就是分片节点(dataNode)
节点主机(dataHost)
数据切分后,每个分片节点(dataNode)不一定都会独占一台机器,同一机器上面可以有多个分片数据库,这样一个或多个分片节点(dataNode)所在的机器就是节点主机(dataHost),为了规避单节点主机并发数限制,尽量将读写压力高的分片节点(dataNode)均衡的放在不同的节点主机(dataHost)
分片规则(rule)
按照某种业务规则把数据分到某个分片的规则就是分片规则,数据切分选择合适的分片规则非常重要,将极大的避免后续数据处理的难度
Mycat 的配置
1 schema.xml
管理着 MyCat 的逻辑库、表、分片规则、DataNode 以 及 DataSource
<?xml version="1.0"?>
<!DOCTYPE mycat:schema SYSTEM "schema.dtd">
<mycat:schema xmlns:mycat="http://io.mycat/">
<schema name="TESTDB" checkSQLschema="true" sqlMaxLimit="100" randomDataNode="dn1">
<table name="customer" primaryKey="id" dataNode="dn1,dn2" rule="sharding-by-intfile" autoIncrement="true" fetchStoreNodeByJdbc="true">
<childTable name="customer_addr" primaryKey="id" joinKey="customer_id" parentKey="id"> </childTable>
</table>
</schema>
<!-- dataNode 标签定义了 MyCat 中的数据节点,也就是我们通常说所的数据分片。一个 dataNode 标签就是一个独立的数据分片 -->
<dataNode name="dn1" dataHost="localhost1" database="db1" />
<dataNode name="dn2" dataHost="localhost1" database="db2" />
<dataNode name="dn3" dataHost="localhost1" database="db3" />
<!-- 定义了具体的数据库实例、读写分离配置和心跳语句 -->
<dataHost name="localhost1" maxCon="1000" minCon="10" balance="0" writeType="0" dbType="mysql" dbDriver="jdbc" switchType="1" slaveThreshold="100">
<heartbeat>select user()</heartbeat>
<!-- can have multi write hosts -->
<writeHost host="hostM1" url="jdbc:mysql://localhost:3306" user="root" password="root">
<readHost host="" url="" password="" user=""></readHost>
</writeHost>
<!-- <writeHost host="hostM2" url="localhost:3316" user="root" password="123456"/> -->
</dataHost>
</mycat:schema>
1.1 schema 标签
schema 标签用于定义 MyCat 实例中的逻辑库,MyCat 可以有多个逻辑库,每个逻辑库都有自己的相关配置。可以使用 schema 标签来划分这些不同的逻辑库。
属性名 | 属性说明 |
---|---|
dataNode | 该属性用于绑定逻辑库到某个具体的 database 上,1.3 版本如果配置了 dataNode,则不可以配置分片表,1.4 可以配置默认分片,只需要配置需要分片的表即可 |
checkSQLschema | 为 true 时,schema 的字符去掉,select * from TESTDB.travelrecord 会修改为 select * from travelrecord |
sqlMaxLimit | 当该值设置为某个数值时。每条执行的 SQL 语句,如果没有加上 limit 语句,MyCat 也会自动的加上所对应的值 |
1.1.1 table 标签
定义了 MyCat 中的逻辑表,所有需要拆分的表都需要在这个标签中定义
属性名 | 属性说明 |
---|---|
name | 定义逻辑表的表名,这个名字就如同我在数据库中执行 create table 命令指定的名字一样,同个 schema 标签中定义的名字必须唯一 |
dataNode | 定义这个逻辑表所属的 dataNode, 该属性的值需要和 dataNode 标签中 name 属性的值相互对应 |
rule | 该属性用于指定逻辑表要使用的规则名字,规则名字在 rule.xml 中定义,必须与 tableRule 标签中 name 属性属性值一一对应 |
ruleRequired | 该属性用于指定表是否绑定分片规则,如果配置为 true,但没有配置具体 rule 的话 ,程序会报错 |
primaryKey | 该逻辑表对应真实表的主键 |
type | 该属性定义了逻辑表的类型,目前逻辑表只有“全局表”和”普通表”两种类型。对应的配置: • 全局表:global • 普通表:不指定该值为 globla 的所有表 |
autoIncrement | 使用 autoIncrement=“true” 指定这个表有使用自增长主键 |
subTables | |
needAddLimit | 指定表是否需要自动的在每个语句后面加上 limit 限制 |
1.1.1.1 childTable 标签
childTable 标签用于定义 E-R 分片的子表。通过标签上的属性与父表进行关联
属性名 | 属性说明 |
---|---|
name | 定义子表的表名 |
joinKey | 插入子表的时候会使用这个列的值查找父表存储的数据节点 |
parentKey | 属性指定的值一般为与父表建立关联关系的列名。程序首先获取 joinkey 的值,再通过 parentKey 属性指定的列名产生查询语句,通过执行该语句得到父表存储在哪个分片上。从而确定子表存储的位置 |
primaryKey | 同 table 标签所描述的 |
needAddLimit | 同 table 标签所描述的 |
1.2 dataNode 标签
定义了 MyCat 中的数据节点,也就是我们通常说所的数据分片。一个 dataNode 标签就是一个独立的数据分片
<dataNode name="dn1" dataHost="lch3307" database="db1" ></dataNode>
例子中所表述的意思为:使用名字为 lch3307 数据库实例上的 db1 物理数据库,这就组成一个数据分片,最后,我们使用名字 dn1 标识这个分片
属性名 | 属性说明 |
---|---|
name | 定义数据节点的名字,这个名字需要是唯一的,我们需要在 table 标签上应用这个名字,来建立表与分片对应的关系 |
dataHost | 该属性用于定义该分片属于哪个数据库实例的,属性值是引用 dataHost 标签上定义的 name 属性 |
database | 该属性用于定义该分片属性哪个具体数据库实例上的具体库,因为这里使用两个纬度来定义分片,就是:实例+具体的库。因为每个库上建立的表和表结构是一样的。所以这样做就可以轻松的对表进行水平拆分 |
1.3 dataHost 标签
属性名 | 属性说明 |
---|---|
name | 唯一标识 dataHost 标签,供上层的标签使用 |
maxCon | 指定每个读写实例连接池的最大连接 |
minCon | 指定每个读写实例连接池的最小连接,初始化连接池的大小 |
balance | 1. balance="0", 不开启读写分离机制,所有读操作都发送到当前可用的 writeHost 上。 2. balance="1",全部的 readHost 与 stand by writeHost 参与 select 语句的负载均衡,简单的说,当双主双从模式(M1->S1,M2->S2,并且 M1 与 M2 互为主备),正常情况下,M2,S1,S2 都参与 select 语句的负载均衡。 3. balance="2",所有读操作都随机的在 writeHost、readhost 上分发。 4. balance="3",所有读请求随机的分发到 wiriterHost 对应的 readhost 执行,writerHost 不负担读压力,注意 balance=3 只在 1.4 及其以后版本有,1.3 没有 |
writeType | 1. writeType="0",所有写操作发送到配置的第一个 writeHost,第一个挂了切到还生存的第二个 writeHost,重新启动后已切换后的为准,切换记录在配置文件中:dnindex.properties . 2. writeType="1",所有写操作都随机的发送到配置的 writeHost,1.5 以后废弃不推荐 |
dbType | 指定后端连接的数据库类型,例如:mongodb、oracle、spark 等 |
dbDriver | 指定连接后端数据库使用的 Driver,目前可选的值有 native 和 jdbc 如果使用 JDBC 的话需要将符合 JDBC 4 标准的驱动 JAR 包放到 MYCAT\lib 目录下,并检查驱动 JAR 包中包括如下目录结构的文件:META-INF\services\java.sql.Driver。在这个文件内写上具体的 Driver 类名,例如:com.mysql.jdbc.Driver |
switchType | switchType = -1, 表示不自动切换 switchType = 1 默认值,自动切换 switchType = 2 基于 MySQL 主从同步的状态决定是否切换心跳语句为 show slave status switchType = 3 基于 MySQL galary cluster 的切换机制(适合集群)(1.4.1)心跳语句为 show status like ‘wsrep%’ |
tempReadHostAvailable | 如果配置了这个属性 writeHost 下面的 readHost 仍旧可用,默认 0 可配置(0、1) |
1.3.1 heartbeat 标签
这个标签内指明用于和后端数据库进行心跳检查的语句。例如,MYSQL 可以使用 select user(),Oracle 可以使用 select 1 from dual 等
1.3.2 writeHost 标签、readHost 标签
这两个标签都指定后端数据库的相关配置给 mycat,用于实例化后端连接池。唯一不同的是,writeHost 指定写实例、readHost 指定读实例
在一个 dataHost 内可以定义多个 writeHost 和 readHost。但是,如果 writeHost 指定的后端数据库宕机,那么这个 writeHost 绑定的所有 readHost 都将不可用。另一方面,由于这个 writeHost 宕机系统会自动的检测到,并切换到备用的 writeHost 上去
属性名 | 属性说明 |
---|---|
host | 用于标识不同实例,一般 writeHost 我们使用 *M1,readHost 我们用 *S1 |
url | 后端实例连接地址,如果是使用 native 的 dbDriver,则一般为 address:port 这种形式。用 JDBC 或其他的 dbDriver,则需要特殊指定。当使用 JDBC 时则可以这么写:jdbc:mysql://localhost:3306/ |
password | 后端存储实例需要的密码 |
user | 后端存储实例需要的用户名字 |
weight | 权重,配置在 readhost 中作为读节点的权重(1.4 以后) |
usingDecrypt | 是否对密码加密默认 0 否 如需要开启配置 1,同时使用加密程序对密码加密 |
2 server.xml
server.xml 几乎保存了所有 mycat 需要的系统配置信息。其在代码内直接的映射类为 SystemConfig 类
2.1 user 标签
<user name="test">
<property name="password">test</property>
<property name="schemas">TESTDB</property>
<property name="readOnly">true</property>
<property name="benchmark">11111</property>
<property name="usingDecrypt">1</property>
<privileges check="false">
<schema name="TESTDB" dml="0010" showTables="custome/mysql">
<table name="tbl_user" dml="0110"></table>
<table name="tbl_dynamic" dml="1111"></table>
</schema>
</privileges>
</user>
server.xml 中的标签本就不多,这个标签主要用于定义登录 mycat 的用户和权限。例如上面的例子中,我定义了一个用户,用户名为 test、密码也为 test,可访问的 schema 也只有 TESTDB 一个
属性名 | 属性说明 |
---|---|
Benchmark | benchmark 基准, 当前端的整体 connection 数达到基准值是, 对来自该账户的请求开始拒绝连接,0 或不设表示不限制 |
usingDecrypt | 是否对密码加密默认 0 否 如需要开启配置 1,同时使用加密程序对密码加密 |
2.1.1 privileges 标签
对用户的 schema 及 下级的 table 进行精细化的 DML 权限控制,privileges 节点中的 check 属性是用于标识是否开启 DML 权限检查, 默认 false 标识不检查,当然 privileges 节点不配置,等同 check=false,由于 Mycat 一个用户的 schemas 属性可配置多个 schema ,所以 privileges 的下级节点 schema 节点同样可配置多个,对多库多表进行细粒度的 DML 权限控制
Schema/Table 上的 dml 属性描述:
属性名 | 属性说明 | 示例(禁止增删改查) |
---|---|---|
dml | insert,update,select,delete | 0000 |
<!-- 禁止 insert 和 delete 操作 -->
<privileges check="true">
<schema name="TESTDB" dml="0110" >
<table name="table01" dml="0111"></table>
<table name="table02" dml="1111"></table>
</schema>
<schema name="TESTDB1" dml="0110">
<table name="table03" dml="1110"></table>
<table name="table04" dml="1010"></table>
</schema>
</privileges>
2.2 system 标签
这个标签内嵌套的所有 property 标签都与系统配置有关,请注意,下面我会省去标签 property 直接使用这个标签的 name 属性内的值来介绍这个属性的作用
2.2.1 charset 属性
字符集设置
2.2.2 defaultSqlParser 属性
由于 mycat 最初是时候 Foundation DB 的 sql 解析器,而后才添加的 Druid 的解析器。所以这个属性用来指定默认的解析器。目前的可用的取值有:druidparser 和 fdbparser。使用的时候可以选择其中的一种,目前一般都使用 druidparser
2.2.3 processors 属性
这个属性主要用于指定系统可用的线程数,默认值为机器 CPU 核心线程数。主要影响 processorBufferPool、processorBufferLocalPercent、processorExecutor 属性。NIOProcessor 的个数也是由这个属性定义的,所以调优的时候可以适当的调高这个属性
2.2.4 processorBufferChunk 属性
这个属性指定每次分配 Socket Direct Buffer 的大小,默认是 4096 个字节。这个属性也影响 buffer pool 的长度。如果一次性获取的数过大 buffer 不够用经常出现警告,则可以适当调大
2.2.5 processorBufferPool 属性
这个属性指定 bufferPool 计算比例值。由于每次执行 NIO 读、写操作都需要使用到 buffer,系统初始化的时候会建立一定长度的 buffer 池来加快读、写的效率,减少建立 buffer 的时间
Mycat 中有两个主要的 buffer 池:
- BufferPool
- ThreadLocalPool
BufferPool 由 ThreadLocalPool 组合而成,每次从 BufferPool 中获取 buffer 都会优先获取 ThreadLocalPool 中的 buffer,未命中之后才会去获取 BufferPool 中的 buffer。也就是说 ThreadLocalPool 是作为 BufferPool 的二级缓存,由每个线程内部自己使用。当然这其中还有一些限制条件需要线程的名字是由$_开头。然而 BufferPool 上的 buffer 则是每个 NIOProcessor 都共享的。
默认这个属性的值为: 默认 bufferChunkSize(4096) * processors 属性 * 1000
BufferPool 的总长度 = bufferPool / bufferChunk
若 bufferPool 不是 bufferChunk 的整数倍,则总长度为前面计算得出的商 + 1假设系统线程数为 4,其他都为属性的默认值,则:bufferPool = 4096 * 4 * 1000,BufferPool 的总长度 : 4000 = 16384000 / 4096
2.2.6 processorBufferLocalPercent 属性
前面提到了 ThreadLocalPool。这个属性就是用来控制分配这个 pool 的大小用的,但其也并不是一个准确的值,也是一个比例值。这个属性默认值为 100。
线程缓存百分比 = bufferLocalPercent / processors 属性。
例如,系统可以同时运行 4 个线程,使用默认值,则根据公式每个线程的百分比为 25。最后根据这个百分比来计算出具体的 ThreadLocalPool 的长度公式如下:
ThreadLocalPool 的长度 = 线程缓存百分比 * BufferPool 长度 / 100
假设 BufferPool 的长度为 4000,其他保持默认值。那么最后每个线程建立上的 ThreadLocalPool 的长度为: 1000 = 25 * 4000 / 100
2.2.7 processorExecutor 属性
这个属性主要用于指定 NIOProcessor 上共享的 businessExecutor 固定线程池大小。mycat 在需要处理一些异步逻辑的时候会把任务提交到这个线程池中。新版本中这个连接池的使用频率不是很大了,可以设置一个较小的值
2.2.8 sequnceHandlerType 属性
指定使用 Mycat 全局序列的类型。0 为本地文件方式,1 为数据库方式,2 为时间戳序列方式,3 为分布式 ZK ID 生成器,4 为 zk 递增 id 生成
3 rule.xml
rule.xml 里面就定义了我们对表进行拆分所涉及到的规则定义。我们可以灵活的对表使用不同的分片算法,或者对表使用相同的算法但具体的参数不同。这个文件里面主要有 tableRule 和 function 这两个标签。在具体使用过程中可以按照需求添加 tableRule 和 function
3.1 tableRule 标签
<tableRule name="mod-long">
<rule>
<columns>id</columns>
<algorithm>mod-long</algorithm>
</rule>
</tableRule>
属性名 | 属性说明 |
---|---|
name | 属性指定唯一的名字,用于标识不同的表规则 |
内嵌的 rule 标签则指定对物理表中的哪一列进行拆分和使用什么路由算法:
标签名 | 标签说明 |
---|---|
columns | 指定要拆分的列名字 |
algorithm | 使用 function 标签中的 name 属性 |
3.2 function 标签
<function name="mod-long" class="io.mycat.route.function.PartitionByMod">
<!-- how many data nodes -->
<property name="count">3</property>
</function>
属性名 | 属性说明 |
---|---|
name | 指定算法的名字 |
class | 制定路由算法具体的类名字 |
property | 算法需要用到的一些属性 |
Mycat 的分片 join
Mycat 目前版本支持跨分片的 join,主要实现的方式有四种:全局表,ER 分片,catletT(人工智能)和 ShareJoin,ShareJoin 在开发版中支持,前面三种方式 1.3.0.1 支持
1 全局表
2 ER分片
MyCAT 借鉴了 NewSQL 领域的新秀 Foundation DB 的设计思路,Foundation DB 创新性的提出了 TableGroup 的概念,其将子表的存储位置依赖于主表,并且物理上紧邻存放,因此彻底解决了 JION 的效率和性能问题,根据这一思路,提出了基于 E-R 关系的数据分片策略,子表的记录与所关联的父表记录存放在同一个数据分片上
customer 采用 sharding-by-intfile 这个分片策略,分片在 dn1,dn2 上,orders 依赖父表进行分片,两个表的关联关系为 orders.customer_id=customer.id。于是数据分片和存储的示意图如下:
这样一来,分片 Dn1 上的的 customer 与 Dn1 上的 orders 就可以进行局部的 JOIN 联合,Dn2 上也如此,再合并两个节点的数据即可完成整体的 JOIN,试想一下,每个分片上 orders 表有 100 万条,则 10 个分片就有 1 个亿,基于 E-R 映射的数据分片模式,基本上解决了 80%以上的企业应用所面临的问题
3 Share join
ShareJoin 是一个简单的跨分片 Join,基于 HBT 的方式实现
目前支持 2 个表的 join,原理就是解析 SQL 语句,拆分成单表的 SQL 语句执行,然后把各个节点的数据汇集
4 catlet(人工智能)
全局序列号
在实现分库分表的情况下,数据库自增主键已无法保证自增主键的全局唯一。为此,MyCat 提供了全局sequence,并且提供了包含本地配置和数据库配置等多种实现方式
1 本地文件方式
2 数据库配置方式
Mycat SQL 拦截机制
用户可以写一个 java 类,将传入 MyCAT 的 SQL 进行改写然后交给Mycat 去执行,此技巧可以完成如下一些特殊功能:
- 捕获和记录某些特殊的 SQL;
- 记录 sql 查找异常;
- 出于性能优化的考虑,改写 SQL,比如改变查询条件的顺序或增加分页限制;
- 将某些 Select SQL 强制设置为 Read 模式,走读写分离(很多事务框架很难剥离事务中的 Select SQL;
- 后期 Mycat 智能优化,拦截所有 sql 做智能分析,自动监控节点负载,自动优化路由,提供数据库优化建议
SQL 拦截的原理是在路由之前拦截 SQL,然后做其他处理,完了之后再做路由,执行,如下图所示:
二、实现自定义 SQL 拦截器
2.1 实现 SQLInterceptor 接口
/**
* used for interceptor sql before execute ,can modify sql befor execute
* @author wuzhih
*
*/
public interface SQLInterceptor {
/**
* return new sql to handler,ca't modify sql's type
* @param sql
* @param sqlType
* @return new sql
*/
String interceptSQL(String sql ,int sqlType);
}
例如,我们自定义实现一个 SQL 拦截器,对每个 SQL 语句进行 EXPLAIN 分析,找出潜在的慢 sql
2.2 在 server.xml 中添加拦截器配置
运行采坑记录
一、Client does not support authentication protocol requested by server
解决方案:
USE mysql;
ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY '${password}';
FLUSH PRIVILEGES;
官方解决链接:https://dev.mysql.com/doc/refman/5.6/en/old-client.html
源码解析
一、mycat 通信模型
前端与后端通信框架都为NIO/AIO,因为目前生产上用的linux发行版内核都没有真正实现网络上的AIO,如果应用用AIO的话可能比NIO还要慢一些,所以,我们这里只分析NIO相关的通信模块。
- NIOAcceptor:作为服务器接受客户端连接(前端 NIO 通信)
- NIOConnector:作为客户端去连接后台数据库(MySql,后端 NIO 通信)
- NIOReactor:Reactor 模式的 NIO,处理并转发请求到 RW 线程,其实就是把对应 AbstractConnection(就是 NIO 的 channel 的封装)注册到 RW 线程的selector上,只注册读标记
- NIOReactorPool:一般高性能网络通信框架采用多 Reactor(多 dispatcher)模式,这里将 NIOReactor 池化;每次 NIOConnector 接受一个连接或者NIOAcceptor 请求一个连接,都会封装成 AbstractConnection,同时请求 NIOReactorPool 每次轮询出一个 NIOReactor,之后 AbstractConnection 与这个NIOReactor 绑定(就是3之中说的注册)
- RW:RW 线程,负责执行 NIO 的 channel 读写,这里 channel 封装成了 AbstractConnection
- NIOSocketWR:每个前端连接(FrontendConnection)和后端连接(BackendConnection)都有一个对应的缓冲区,对连接读写操作具体如何操作的方法和缓存方式,封装到了这个类里面
FrontendConnection
客户端与 mycat 的连接,子类有 ServerConnection 和 ManagerConnection
BackendConnection
mycat 连接后天数据库的 connection,子类有 MySqlConnection、JDBCConnection
二、客户端连接建立及认证
- NIOAcceptor#run 方法轮询是否有连接请求,若客户端有连接请求到 mycat,则调用 io.mycat.net.NIOAcceptor#accept 方法
- NIOAcceptor#accept 方法调用 serverChannel.accept 创建一个 SocketChannel,并根据 SocketChannel 创建一个新的前端连接实例 FrontendConnection,然后从 NIOReactor 池中获取的 NIOReactor 实例,并调用 NIOReactor#postRegister 方法将 FrontendConnection 放入 RW 线程的注册队列,然后唤醒 RW 线程的 selector
- RW#run 轮询注册队列中是否有 AbstractConnection,若有则将其取出,并作为读事件注册到 tSelector。此外还调用 FrontendConnection#register 进行前端连接建立和认证
三、客户端 SQL 请求执行流程
RW#run 轮询注册队列中是否有 AbstractConnection,若存在且为读事件则调用 AbstractConnection#asynRead 异步读取数据并梳理,实际处理逻辑见 NIOSocketWR#asynRead
-
NIOSocketWR#asynRead 从 前端连接的 channel 中读取数据,并且保存到对应 AbstractConnection 的 readBuffer 中,之后调用 AbstractConnection#onReadData 处理读取到的数据
@Override public void asynRead() throws IOException { ByteBuffer theBuffer = con.readBuffer; if (theBuffer == null) { theBuffer = con.processor.getBufferPool().allocate(con.processor.getBufferPool().getChunkSize()); con.readBuffer = theBuffer; } // 从 SocketChannel 中读取数据,并且保存到 AbstractConnection 的 readBuffer 中,readBuffer 处于 write mode,返回读取了多少字节 int got = channel.read(theBuffer); // 调用处理读取到的数据的方法 con.onReadData(got); }
AbstractConnection#onReadData 读取 readBuffer 中的数据并调用 AbstractConnection#handle 方法进行下一步处理,其内部调用 FrontendCommandHandler#handle
FrontendCommandHandler#handle 根据 data[4] 来判断命令类型,客户端命令请求报文格式如下图:
data 的第五个字节存储命令类型,客户端命令请求报文命令类型详情表见附录1。我们以 MySQLPacket.COM_QUERY 为例进行接下来的讨论。当 data[4] == MySQLPacket.COM_QUERY 时,调用 FrontendConnection#query(byte[])
public void handle(byte[] data) {
// 判断命令类型
switch (data[4]) {
...
// INSERT/SELECT/UPDATE/DELETE 等 SQL 归属于 MySQLPacket.COM_QUERY
case MySQLPacket.COM_QUERY:
commands.doQuery();
source.query(data);
break;
...
}
}
-
FrontendConnection#query(byte[]) 将 data 字节数组转化成 String 类型的 sql,之后调用 ServerQueryHandler#query(String) 方法
public void query(byte[] data) { MySQLMessage mm = new MySQLMessage(data); // 从 data[5] 即第六个字节开始读取参数体 mm.position(5); String sql = mm.readString(charset); // 执行 sql 语句,内部调用 ServerQueryHandler#query(String) this.query( sql ); }
-
ServerQueryHandler#query(String) 解析 SQL 类型,根据不同类型做不同的处理,之后调用 ServerConnection#routeEndExecuteSQL 进行路由计算(包括全局序列号、SQL 语句拦截等。路由计算详细另述)并得到路由结果 RouteResultset,然后调用 NonBlockingSession#execute
public void routeEndExecuteSQL(String sql, final int type, final SchemaConfig schema) { // 路由计算 RouteResultset rrs = MycatServer .getInstance() .getRouterservice() .route(MycatServer.getInstance().getConfig().getSystem(), schema, type, sql, this.charset, this); if (rrs != null) { // session 执行 session.execute(rrs, rrs.isSelectForUpdate() ? ServerParse.UPDATE : type); } }
-
NonBlockingSession#execute 获取路由的 dataNode 节点,若节点数为1则调用 SingleNodeHandler#execute 处理 sql,否则调用 MultiNodeQueryHandler#execute 处理 sql。此处我们假定前端 sql 命令只路由到一个 dataNode,则调用 SingleNodeHandler#execute 处理 sql
/** * NonBlockingSession#execute */ @Override public void execute(RouteResultset rrs, int type) { RouteResultsetNode[] nodes = rrs.getNodes(); if (nodes.length == 1) { singleNodeHandler = new SingleNodeHandler(rrs, this); singleNodeHandler.execute(); } else { multiNodeHandler = new MultiNodeQueryHandler(type, rrs, autocommit, this); multiNodeHandler.execute(); } }
-
SingleNodeHandler#execute 获取后端连接 BackendConnection,并调用 SingleNodeHandler#_execute,该方法直接调用 BackendConnection#execute
public void execute() throws Exception { // 获取后端数据库连接 final BackendConnection conn = session.getTarget(node); // 若存在 dataNode 对应的 BackendConnection if (session.tryExistsCon(conn, node)) { _execute(conn); } else { // create new connection do something... } } private void _execute(BackendConnection conn) { conn.execute(node, session.getSource(), session.getSource().isAutocommit()); }
-
当 schema.xml 中配置 dataHost 的 dbDriver 为 jdbc 时,调用 JDBCConnection#execute 进行 sql 执行(JDBCConnection 继承 BackendConnection)。该方法新开一个线程处理 sql,最终调用 JDBCConnection#ouputResultSet 执行 sql 并将结果写入 ServerConnection
@Override public void execute(final RouteResultsetNode node, final ServerConnection source, final boolean autocommit) { this.sqlSelectLimit = source.getSqlSelectLimit(); Runnable runnable = new Runnable() { @Override public void run() { // 调用 JDBCConnection#ouputResultSet executeSQL(node, source, autocommit); } }; MycatServer.getInstance().getBusinessExecutor().execute(runnable); }
-
JDBCConnection#ouputResultSet 获取数据库连接并执行 sql,然后将得到的结果集 ResultSet 解析为 Result Set 响应报文并写入 ServerConnection
private void ouputResultSet(ServerConnection sc, String sql) throws SQLException { ResultSet rs = null; Statement stmt = null; try { stmt = con.createStatement(); rs = stmt.executeQuery(sql); List<FieldPacket> fieldPks = new LinkedList<FieldPacket>(); // 根据 resultset 加载列信息,保存至 fieldPks ResultSetUtil.resultSetToFieldPacket(sc.getCharset(), fieldPks, rs, this.isSpark); // 获取列数 int colunmCount = fieldPks.size(); ByteBuffer byteBuf = sc.allocate(); /* 1 写入 resultset header packet */ ResultSetHeaderPacket headerPkg = new ResultSetHeaderPacket(); headerPkg.fieldCount = fieldPks.size(); headerPkg.packetId = ++packetId; // 将 ResultSetHeaderPacket 的数据写入 byteBuf byteBuf = headerPkg.write(byteBuf, sc, true); byteBuf.flip(); byte[] header = new byte[byteBuf.limit()]; // 将 byteBuf 中的信息写入 header 中 byteBuf.get(header); // byteBuf 标记归位 byteBuf.clear(); /* 2 写入 field packet */ List<byte[]> fields = new ArrayList<byte[]>(fieldPks.size()); Iterator<FieldPacket> itor = fieldPks.iterator(); while (itor.hasNext()) { FieldPacket curField = itor.next(); curField.packetId = ++packetId; // 将 FieldPacket 的数据写入 byteBuf byteBuf = curField.write(byteBuf, sc, false); // position 设回 0,并将 limit 设成之前的 position 的值 // limit:缓冲区数组中不可操作的下一个元素的位置:limit<=capacity byteBuf.flip(); byte[] field = new byte[byteBuf.limit()]; // 将 byteBuf 中的信息写入 field 中 byteBuf.get(field); byteBuf.clear(); // 将 field 放入 fields fields.add(field); } /* 3 写入 EOF packet */ EOFPacket eofPckg = new EOFPacket(); eofPckg.packetId = ++packetId; // 将 EOFPacket 的数据写入 byteBuf byteBuf = eofPckg.write(byteBuf, sc, false); byteBuf.flip(); byte[] eof = new byte[byteBuf.limit()]; // 将 byteBuf 中的信息写入 eof 中 byteBuf.get(eof); byteBuf.clear(); this.respHandler.fieldEofResponse(header, fields, eof, this); /* 4 写入 Row Data packet */ // output row while (rs.next()) { ResultSetMetaData resultSetMetaData = rs.getMetaData(); int size = resultSetMetaData.getColumnCount(); StringBuilder builder = new StringBuilder(); for (int i = 1; i <= size; i++) { builder.append(resultSetMetaData.getColumnName(i) + "=" + rs.getString(i)); if (i < size) { builder.append(", "); } } LOGGER.debug("JDBCConnection.ouputResultSet sql: {}, resultSet: {}", sql, builder.toString()); RowDataPacket curRow = new RowDataPacket(colunmCount); for (int i = 0; i < colunmCount; i++) { int j = i + 1; if (MysqlDefs.isBianry((byte) fieldPks.get(i).type)) { curRow.add(rs.getBytes(j)); } else if (fieldPks.get(i).type == MysqlDefs.FIELD_TYPE_DECIMAL || fieldPks.get(i).type == (MysqlDefs.FIELD_TYPE_NEW_DECIMAL - 256)) { // field type is unsigned byte // ensure that do not use scientific notation format BigDecimal val = rs.getBigDecimal(j); curRow.add(StringUtil.encode(val != null ? val.toPlainString() : null, sc.getCharset())); } else { curRow.add(StringUtil.encode(rs.getString(j), sc.getCharset())); } } curRow.packetId = ++packetId; // 将 RowDataPacket 的数据写入 byteBuf byteBuf = curRow.write(byteBuf, sc, false); byteBuf.flip(); byte[] row = new byte[byteBuf.limit()]; byteBuf.get(row); byteBuf.clear(); this.respHandler.rowResponse(row, this); } fieldPks.clear(); // end row /* 5 写入 EOF packet */ eofPckg = new EOFPacket(); eofPckg.packetId = ++packetId; byteBuf = eofPckg.write(byteBuf, sc, false); byteBuf.flip(); eof = new byte[byteBuf.limit()]; byteBuf.get(eof); sc.recycle(byteBuf); this.respHandler.rowEofResponse(eof, this); } finally { if (rs != null) { try { rs.close(); } catch (SQLException e) { } } if (stmt != null) { try { stmt.close(); } catch (SQLException e) { } } } }
四、Mycat 初始化
Mycat 初始化主要负责启动 MycatServer 实例,启动 MycatServer 实例的过程中,核心工作是读取并解析 Mycat 配置文件(schema.xml、rule.xml 和 server.xml)。MycatServer 使用 “饥饿模式” 初始化一个单例
public class MycatServer {
private static final MycatServer INSTANCE = new MycatServer();
public static final MycatServer getInstance() {
return INSTANCE;
}
private MycatServer() {
// 读取文件配置
this.config = new MycatConfig();
...
}
}
读取并解析 Mycat 配置文件的具体实现交由 MycatConfig。MycatConfig 内部使用 ConfigInitializer 解析全局配置。ConfigInitializer 主要处理一下几件事情:
- 读取 schema.xml、rule.xml 和 server.xml 文件并将解析到的配置类赋给 ConfigInitializer 的变量中
- 解析 DataHost 和对应的 DataNode,创建物理数据库连接池(PhysicalDBPool)和物理数据库节点(PhysicalDBNode)
- 权限管理设置
- 加载全局序列处理器配置
- 配置文件自检
在此我重点叙述 1 和 2
4.1 配置文件读取
// 读取 rule.xml和schema.xml
SchemaLoader schemaLoader = new XMLSchemaLoader();
// 读取 server.xml
XMLConfigLoader configLoader = new XMLConfigLoader(schemaLoader);
4.2 创建物理数据库连接池(PhysicalDBPool)
initDataHosts 为每一个 <dataHost> 节点创建一个数据库连接池,创建完成后返回 Map<String, PhysicalDBPool> physicalDBPoolMap,其中 key 为 <dataHost> 节点的 name 属性值,value 为 <dataHost> 节点对应的数据库连接池
private Map<String, PhysicalDBPool> initDataHosts(ConfigLoader configLoader) {
Map<String, DataHostConfig> dataHostConfigMap = configLoader.getDataHosts();
Map<String, PhysicalDBPool> physicalDBPoolMap = new HashMap<>(dataHostConfigMap.size());
for (DataHostConfig dataHostConfig : dataHostConfigMap.values()) {
// 为每个 dataHost 节点建立一个 PhysicalDBPool
PhysicalDBPool pool = getPhysicalDBPool(dataHostConfig, configLoader);
physicalDBPoolMap.put(pool.getHostName(), pool);
}
return physicalDBPoolMap;
}
io.mycat.config.ConfigInitializer#getPhysicalDBPool 方法为每个 <dataHost> 节点建立一个 PhysicalDBPool。主要工作如下:
- 为每一个 <dataHost> 节点的 <writeHost> 节点创建一个 PhysicalDatasource
- 为每一个 <dataHost> 节点的 <readHost> 节点创建一个 PhysicalDatasource
- 初始化 PhysicalDBPool 并返回
private PhysicalDBPool getPhysicalDBPool(DataHostConfig dataHostConfig, ConfigLoader configLoader) {
// dataHost 节点名
String name = dataHostConfig.getName();
// 数据库类型,我们这里只讨论MySQL
String dbType = dataHostConfig.getDbType();
// 连接数据库驱动,我们这里只讨论 MyCat 自己实现的 native
String dbDriver = dataHostConfig.getDbDriver();
// 1 为每一个 <dataHost> 节点的 <writeHost> 节点创建一个 PhysicalDatasource
PhysicalDatasource[] writeSources = createDataSource(dataHostConfig, name, dbType, dbDriver, dataHostConfig.getWriteHosts(), false);
Map<Integer, DBHostConfig[]> readHostsMap = dataHostConfig.getReadHosts();
Map<Integer, PhysicalDatasource[]> readSourcesMap = new HashMap<Integer, PhysicalDatasource[]>(readHostsMap.size());
// 对于每个读节点建立 key 为 writeHost 下标 value 为 readHost 的 PhysicalDatasource[] 的哈希表
for (Map.Entry<Integer, DBHostConfig[]> entry : readHostsMap.entrySet()) {
// 2 为每一个 <dataHost> 节点的 <readHost> 节点创建一个 PhysicalDatasource
PhysicalDatasource[] readSources = createDataSource(dataHostConfig, name, dbType, dbDriver, entry.getValue(), true);
readSourcesMap.put(entry.getKey(), readSources);
}
// 3 初始化 PhysicalDBPool 并返回
PhysicalDBPool pool = new PhysicalDBPool(dataHostConfig.getName(), dataHostConfig, writeSources, readSourcesMap, dataHostConfig.getBalance(), dataHostConfig.getWriteType());
pool.setSlaveIDs(dataHostConfig.getSlaveIDs());
return pool;
}
io.mycat.config.ConfigInitializer#createDataSource 完成具体的数据源创建。根据不同的 dvType 和 dbDriver 创建不同的 PhysicalDatasource:
- dvType == mysql && dbDriver == native --> MySQLDataSource
- dvType == mysql && dbDriver == jdbc --> JDBCDataSource
- dvType == postgresql && dbDriver == native --> PostgreSQLDataSource
private PhysicalDatasource[] createDataSource(DataHostConfig dataHostConfig, String hostName, String dbType, String dbDriver, DBHostConfig[] nodes, boolean isRead) {
PhysicalDatasource[] dataSources = new PhysicalDatasource[nodes.length];
if ("mysql".equals(dbType) && "native".equals(dbDriver)) {
for (int i = 0; i < nodes.length; i++) {
//设置最大 idle 时间,默认为 30 分钟(可自定义)
nodes[i].setIdleTimeout(system.getIdleTimeout());
MySQLDataSource ds = new MySQLDataSource(nodes[i], dataHostConfig, isRead);
dataSources[i] = ds;
}
} else if ("jdbc".equals(dbDriver)) {
for (int i = 0; i < nodes.length; i++) {
nodes[i].setIdleTimeout(system.getIdleTimeout());
JDBCDatasource ds = new JDBCDatasource(nodes[i], dataHostConfig, isRead);
dataSources[i] = ds;
}
} else if ("postgresql".equalsIgnoreCase(dbType) && dbDriver.equalsIgnoreCase("native")) {
for (int i = 0; i < nodes.length; i++) {
nodes[i].setIdleTimeout(system.getIdleTimeout());
PostgreSQLDataSource ds = new PostgreSQLDataSource(nodes[i], dataHostConfig, isRead);
dataSources[i] = ds;
}
} else {
throw new ConfigException("not supported yet !" + hostName);
}
return dataSources;
}
4.3 创建物理数据库节点(PhysicalDBNode)
io.mycat.config.ConfigInitializer#initDataNodes 为每个 <dataNode> 节点创建一个 PhysicalDBNode,根据 <dataNode> 节点的 dataHost 属性值从 Map<String, PhysicalDBPool> dataHosts 中找到 <dataNode> 对应的连接池,并赋予 PhysicalDBNode
private Map<String, PhysicalDBNode> initDataNodes(ConfigLoader configLoader) {
Map<String, PhysicalDBNode> nodes = new HashMap<String, PhysicalDBNode>(dataNodeConfigMap.size());
Map<String, DataNodeConfig> dataNodeConfigMap = configLoader.getDataNodes();
for (DataNodeConfig dataNodeConfig : dataNodeConfigMap.values()) {
// 根据 dataHost 名称获取对应的 PhysicalDBPool
PhysicalDBPool pool = this.dataHosts.get(dataNodeConfig.getDataHost());
PhysicalDBNode dataNode = new PhysicalDBNode(dataNodeConfig.getName(), dataNodeConfig.getDatabase(), pool);
nodes.put(dataNode.getName(), dataNode);
}
return nodes;
}
五、mycat 的连接池模型
Mycat 为了最高效的利用后端的 MySQL 连接,采取了不同于 Cobar 也不同于传统 JDBC 连接池的做法,传统的做法是基于 Database 的连接池,即一个 MySQL 服务器上有 5 个 Database,则每个 Database 独占最大200 个连接。这种模式的最大问题在于,将一个数据库所具备的最大 1000 个连接,隔离成了更新小的连接池,于是可能产生一个应用的连接不够,但其他应用的连接却很空闲的资源浪费情况,而对于分片这种场景,这个缺陷则几乎是致命的,因为每个分片所对应的 Database 的连接数量被限制在了一个很小的范围内,从而导致系统并发能力的大幅降低。而 Mycat 则采用了基于 MySQL 实例的连接池模式,每个 Database 都可以用现有的 1000 个连接中的空闲连接
5.1 核心对象
5.1.1 ConMap 和 ConQueue
在 Mycat 的连接池里,当前可用的、空闲的 MySQL 连接是放到一个 ConcurrentHashMap 的数据结构里,Key 为当前连接对应的 Database,另外还有二级分类 ConQueue,按照连接是自动提交还是手动提交模式进行区分,这个设计是为了高效的查询匹配的可用连接。ConMap 和 ConQueue 包含的关键对象有:
- ConcurrentHashMap<String, ConQueue> items:可用的 MySQL 连接容器,key 为当前连接对应的 database 名,value 为 ConQueue 对象,里面包含了两个存储数据库连接的队列
- ConcurrentLinkedQueue<BackendConnection> autoCommitCons:自动提交的数据库连接
- ConcurrentLinkedQueue<BackendConnection> manCommitCons:手动提交的数据库连接
public class ConMap {
/**
* key:当前连接对应的 Database
* ConQueue:数据库连接队列(按照连接是自动提交还是手动提交模式进行区分,这个设计是为了高效的查询匹配的可用连接)
*/
private final ConcurrentHashMap<String, ConQueue> items = new ConcurrentHashMap<String, ConQueue>();
}
public class ConQueue {
private final ConcurrentLinkedQueue<BackendConnection> autoCommitCons = new ConcurrentLinkedQueue<BackendConnection>();
private final ConcurrentLinkedQueue<BackendConnection> manCommitCons = new ConcurrentLinkedQueue<BackendConnection>();
private long executeCount;
}
BackendConnection 为后端数据库连接,其实现有 JDBCConnection、MySQLConnection 等
5.1.2 PhysicalDatasource
对应于 <dataHost> 节点下的 <writeHost> 或 <readHost> 子节点,表示一个物理数据库实例。每个数据库实例中保存了多个可用的数据库连接(BackendConnection),mycat 初始化时,根据 <dataHost> 节点的 minCon
属性值初始化多个可用的数据库连接。其关键对象有:
- size:读或写连接池的最大连接数
- conMap:存放当前可用的数据库连接
public abstract class PhysicalDatasource {
private final String name;
private final int size;
private final DBHostConfig config;
private final ConMap conMap = new ConMap();
private final boolean readNode;
private final DataHostConfig hostConfig;
private PhysicalDBPool dbPool;
}
PhysicalDatasource 的实现类有:
5.1.3 PhysicalDBPool
对应于 <dataHost name="localhost1" > 节点,表示物理数据库实例池。由于 <datahost> 节点可包含多个 <writeHost> 节点,因此 PhysicalDBPool 可以包含多个物理数据库实例,其关键对象有:
- hostName:<dataHosr> 标签的 name 属性
- writeSources 和 readSources:可写和可读的多个物理数据库实例,对应于 <writeHost> 和 <readHost>
- activedIndex:表明了当前是哪个写节点的数据源在生效
public class PhysicalDBPool {
private final String hostName;
protected PhysicalDatasource[] writeSources;
protected Map<Integer, PhysicalDatasource[]> readSources;
protected volatile int activedIndex;
private final DataHostConfig dataHostConfig;
}
5.1.4 PhysicalDBNode
对应于 <dataNode /> 节点,表示一个数据库分片,PhysicalDBNode 包含的关键对象有:
- name:dataNode 名称,对应于 <dataNode> 标签的 name 属性
- database:数据库名称,对应于 <dataNode> 标签的 database 属性
- PhysicalDBPool dbPool:MySQL 连接池,里面包含了多个数据库实例 PhysicalDatasource,并将其按照读节点和写节点分类,实现读写分类和节点切换的功能。其中 activedIndex 属性表明了当前是哪个写节点的数据源在生效
public class PhysicalDBNode {
protected final String name;
protected final String database;
protected final PhysicalDBPool dbPool;
}
若 schema.xml 中配置了一下分片节点:
<dataNode name="dn1" dataHost="localhost1" database="db_demo_01"/>
<dataNode name="dn2" dataHost="localhost1" database="db_demo_02"/>
<dataNode name="dn3" dataHost="localhost1" database="db_demo_03"/>
当某个用户会话需要一个自动提交的,到分片 <dataNode name="dn1" dataHost="localhost1" database="db_demo_01"/> 的 SQL 连接的时候,分片节点 dn1 首先在连接池 dbPool 中查找是否有数据库 db_demo_01 (对应于 PhysicalDatasource)上的可用连接,若有则看是否有自动提交模式的连接,找到就返回,否则返回 db_demo_01 上的手动提交模式的连接;若没有 db_demo_01 的可用连接,则随机返回一个其他数据库(db_demo_02 或 db_demo_03)对应的可用连接;若没有其他数据库也没有可用连接,并且连接池还没达到上限,则创建一个新连接并返回
上述获取数据库连接的逻辑有一种情况是:用户会话得到的数据库连接可能不是来自于 db_demo_01 的,因此在执行具体的 SQL 之前,还有一个自动同步数据库连接的过程:包括事务隔离级别、事务模式、字符集、database 等四个指标。同步完成以后,才会执行具体的 SQL 请求
通过共享一个 MySQL 上的所有数据库的可用连接,并结合连接状态同步的特性,MyCAT 的连接池做到了最佳的吞吐量,也在一定程度上提升了整个系统的并发支撑能力
5.2 创建数据库连接
5.2.1 创建新连接时机
创建新数据库连接的方法为 PhysicalDatasource#createNewConnection(io.mycat.backend.mysql.nio.handler.ResponseHandler, java.lang.Object, java.lang.String),其有两个触发时机:
-
io.mycat.backend.datasource.PhysicalDatasource#createByIdleLitte
执行空闲检测时触发,若当前数据库连接总数(空闲连接数和活动链接数之和)小于连接池的最大连接数,且空闲连接数小于连接池最小连接数,则调用 PhysicalDatasource#createByIdleLitte 方法创建新数据库连接
if ((createCount > 0) && (idleCons + activeCons < size) && (idleCons < hostConfig.getMinCon())) { createByIdleLitte(idleCons, createCount); }
-
io.mycat.backend.datasource.PhysicalDatasource#getConnection
首先调用 ConMap#tryTakeCon(java.lang.String, boolean) 当前 database 上是否有可用连接,若有则立即返回,否则从其他的 database 上找一个可用连接返回。若 ConMap#tryTakeCon 返回 null,表示数据库连接池中没有空闲连接,则调用 PhysicalDatasource#createNewConnection 创建新连接
public void getConnection(String schema, boolean autocommit, final ResponseHandler handler, final Object attachment) throws IOException { // 从当前连接 map 中拿取已建立好的后端连接 BackendConnection con = this.conMap.tryTakeCon(schema, autocommit); if (con != null) { //如果不为空,则绑定对应前端请求的handler takeCon(con, handler, attachment, schema); } else { long activeCons = increamentCount.longValue() + totalConnectionCount; if (activeCons < size) { createNewConnection(handler, attachment, schema); } else { LOGGER.error("the max activeConnnections size can not be max than maxconnections"); throw new IOException("the max activeConnnections size can not be max than maxconnections"); } } }
5.2.2 创建新数据库连接
MycatServer#startup 方法里其中一件事情就是初始化 PhysicalDBPool
public void startup() throws IOException {
...
Map<String, PhysicalDBPool> dataHosts = config.getDataHosts();
for (PhysicalDBPool physicalDBPool : dataHosts.values()) {
String index = dnIndexProperties.getProperty(physicalDBPool.getHostName(), "0");
physicalDBPool.init(Integer.parseInt(index));
physicalDBPool.startHeartbeat();
}
...
}
physicalDBPool.init(Integer.parseInt(index)) 中 调用 PhysicalDBPool#initSource 方法,该方法对每一个 PhysicalDatasource 调用 getConnection 方法创建新的数据库连接
public void init(int index, String reason) {
for (int i = 0; i < writeSources.length; i++) {
int j = loop(i + index);
initSource(j, writeSources[j])
}
}
private boolean initSource(int index, PhysicalDatasource physicalDatasource) {
int initSize = physicalDatasource.getConfig().getMinCon();
CopyOnWriteArrayList<BackendConnection> list = new CopyOnWriteArrayList<>();
GetConnectionHandler getConHandler = new GetConnectionHandler(list, initSize);
for (int i = 0; i < initSize; i++) {
try {
physicalDatasource.getConnection(this.schemas[i % schemas.length], true, getConHandler, null);
} catch (Exception e) {
LOGGER.warn(getMessage(index, " init connection error."), e);
}
}
...
}
PhysicalDatasource#createNewConnection 方法新建一个线程异步执行创建数据库连接的操作,每个线程通过调用抽象方法来进行具体的创建逻辑。
private void createNewConnection(final ResponseHandler handler, final Object attachment, final String schema) throws IOException {
// aysn create connection
final AtomicBoolean hasError = new AtomicBoolean(false);
MycatServer.getInstance().getBusinessExecutor().execute(new Runnable() {
@Override
public void run() {
try {
createNewConnection(new DelegateResponseHandler(handler) {
@Override
public void connectionError(Throwable e, BackendConnection conn) {
if (hasError.compareAndSet(false, true)) {
handler.connectionError(e, conn);
} else {
LOGGER.info("connection connectionError ");
}
}
@Override
public void connectionAcquired(BackendConnection conn) {
LOGGER.info("connection id is " + conn.getId());
takeCon(conn, handler, attachment, schema);
}
}, schema);
} catch (IOException e) {
if (hasError.compareAndSet(false, true)) {
handler.connectionError(e, null);
} else {
LOGGER.info("connection connectionError ");
}
}
}
});
}
每个继承 PhysicalDatasource 的数据源对象自己实现如下抽象方法:
public abstract void createNewConnection(ResponseHandler handler, String schema) throws IOException;
我们以 JDBCDatasource 为例,其实现的 createNewConnection 方法实现代码如下:
public class JDBCDatasource extends PhysicalDatasource {
@Override
public void createNewConnection(ResponseHandler handler, String schema) throws IOException {
DBHostConfig cfg = getConfig();
JDBCConnection jdbcConnection = new JDBCConnection();
jdbcConnection.setHost(cfg.getIp());
jdbcConnection.setPort(cfg.getPort());
jdbcConnection.setJdbcDatasource(this);
jdbcConnection.setSchema(schema);
jdbcConnection.setDbType(cfg.getDbType());
// 复用mysql的Backend的ID,需要在process中存储
jdbcConnection.setId(NIOConnector.ID_GENERATOR.getId());
NIOProcessor processor = MycatServer.getInstance().nextProcessor();
jdbcConnection.setProcessor(processor);
processor.addBackend(jdbcConnection);
try {
Connection con = getConnection();
jdbcConnection.setCon(con);
// notify handler
handler.connectionAcquired(jdbcConnection);
} catch (Exception e) {
handler.connectionError(e, jdbcConnection);
}
}
}
主要做了一下几件事:
实例化一个 JDBCConnection,设置相关参数
-
调用 JDBCDatasource#getConnection 获取 Connection
JDBCDatasource#getConnection 直接使用 DriverManager 创建一个新连接并返回
public Connection getConnection() throws SQLException { DBHostConfig cfg = getConfig(); Connection connection = DriverManager.getConnection(cfg.getUrl(), cfg.getUser(), cfg.getPassword()); return connection; }
-
调用 DelegateResponseHandler#connectionAcquired,作为已获得有效连接的响应处理
@Override public void connectionAcquired(BackendConnection conn) { LOGGER.info("connection id is " + conn.getId()); takeCon(conn, handler, attachment, schema); }
ResponseHandler#connectionAcquired 调用 PhysicalDatasource#takeCon 方法进行相应处理,代码如下:
private BackendConnection takeCon(BackendConnection conn, final ResponseHandler handler, final Object attachment, String schema) { // 该连接已被借用 conn.setBorrowed(true); if (!conn.getSchema().equals(schema)) { // need do schema syn in before sql send conn.setSchema(schema); } ConQueue queue = conMap.getSchemaConQueue(schema); queue.incExecuteCount(); conn.setAttachment(attachment); // 每次取连接的时候,更新下 lasttime,防止在前端连接检查的时候,关闭连接,导致sql执行失败 conn.setLastTime(System.currentTimeMillis()); handler.connectionAcquired(conn); return conn; }
主要做了一下几件事:
设置连接已被借用
获取 ConQueue,增加可以执行连接的数量
-
调用 ResponseHandler#connectionAcquired,具体实现见 GetConnectionHandler#connectionAcquired,该方法调用 BackendConnection#release 释放连接,调用 PhysicalDatasource#returnCon 方法将释放的连接放回 ConMap 的 ConQueue 中
@Override public void connectionAcquired(BackendConnection conn) { successCons.add(conn); finishedCount.addAndGet(1); logger.info("connected successfully " + conn); conn.release(); } @Override public void JDBCConnection#release() { jdbcDatasource.releaseChannel(this); } public void PhysicalDatasource#releaseChannel(BackendConnection c) { returnCon(c); } private void PhysicalDatasource#returnCon(BackendConnection c) { c.setAttachment(null); c.setBorrowed(false); c.setLastTime(TimeUtil.currentTimeMillis()); ConQueue queue = this.conMap.getSchemaConQueue(c.getSchema()); boolean ok = false; if (c.isAutocommit()) { ok = queue.getAutoCommitCons().offer(c); } else { ok = queue.getManCommitCons().offer(c); } // 若无法放入 ConQueue 则将连接关闭 if (!ok) { LOGGER.warn("can't return to pool ,so close con " + c); c.close("can't return to pool "); } }
5.2.3 总结
Mycat 服务启动时调用 MycatServer 的 startUp 方法对每一个 <dataHost> 节点的多个 <writeHost> 节点对应的数据源做初始化工作。初始创建数据库连接数由 <dataHost> 节点的minCon 属性值决定。每创建一个 BackendConnection 便回调 GetConnectionHandler#connectionAcquired 将新生成的 connection 的 borrowed 属性设置为 false(该属性个人理解是标记连接是否空闲),然后将connection 保存于 ConQueue 中
因此一个 <dataHost> 节点对应一个 PhysicalDBPool
,PhysicalDBPool
类中的 PhysicalDatasource[] writeSources
对应于 <dataHost> 节点下多个 <writeHost> 节点。每个 PhysicalDatasource
中持有一个 ConMap conMap
作为数据源的连接池, 里面存放着可用的数据库连接 BackendConnection
Mycat 根据 <dataNode> 节点的 dataHost 属性和 database 属性,将数据库连接均匀得分配给在同一个 dataHost 中的不同数据库。例如对于以下配置:
<!-- 分片节点 -->
<dataNode name="dn1" dataHost="localhost1" database="db_demo_01"/>
<dataNode name="dn2" dataHost="localhost1" database="db_demo_02"/>
<dataNode name="dn3" dataHost="localhost1" database="db_demo_03"/>
<!-- 节点主机 -->
<dataHost name="localhost1" maxCon="10000" minCon="100" balance="0" writeType="0" dbType="mysql" dbDriver="jdbc" switchType="1" slaveThreshold="100">
<heartbeat>select user()</heartbeat>
<writeHost host="hostM1" url="jdbc:mysql://localhost:3306?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&autoReconnect=true" user="root" password="root">
</writeHost>
</dataHost>
Mycat 会初始化 100 个(minCon="100")数据库连接,并将这 100 个连接均分给 db_demo_01、db_demo_02 和 db_demo_03,如下图所示
5.3 获取数据库连接
本节主要讲述获取可用的数据库连接用于执行客户端的 SQL 请求
5.3.1 涉及的核心类
- NIOAcceptor:负责处理 Accept 事件,即 MyCAT 作为服务端去处理前端业务程序发过来的连接请求
- ServerConnectionFactory:客户端和 Mycat 连接工厂,用于创建客户端连接
- ServerConnection:客户端连接(客户端和 Mycat 之间的连接)
- NonBlockingSession:客户端连接和后端数据库连接的会话,其核心对象有:
- ServerConnection serverConnection:客户端连接
- ConcurrentMap<RouteResultSetNode, BackendConnection> backendConnectionMap:存放路由节点和对应的数据库连接的容器
- SingleNodeHandler singleNodeHandler:单路由节点请求处理器
- MultiNodeQueryHandler multiNodeHandler:多路由节点请求处理器
- SingleNodeHandler(MultiNodeQueryHandler):路由节点请求处理器,其核心对象有:
- RouteResultSetNode routeResultSetNode
- RouteResultset rrs
- NonBlockingSession session
- PhysicalDBNode
- PhysicalDatasource
5.3.2 获取过程解析
NIOAcceptor 在接受到前端发来的连接请求后,会调用 ServerConnectionFactory 实例化一个 ServerConnection,并实例化一个 NonBlockingSession 赋予 ServerConnection
public class ServerConnectionFactory extends FrontendConnectionFactory {
@Override
protected FrontendConnection getConnection(NetworkChannel channel) throws IOException {
SystemConfig systemConfig = MycatServer.getInstance().getConfig().getSystem();
// 将 channel 包装为一个 ServerConnection
ServerConnection serverConnection = new ServerConnection(channel);
// 设置客户端查询处理器
serverConnection.setQueryHandler(new ServerQueryHandler(serverConnection));
// 设置客户端和 Mycat 的一个会话 session
serverConnection.setSession2(new NonBlockingSession(serverConnection));
...
return serverConnection;
}
}
前端 SQL 请求进来之后,Mycat 调用 ServerConnection#routeEndExecuteSQL 进行路由计算并得到路由结果 RouteResultset,然后调用 NonBlockingSession#execute 进行处理,若 RouteResultset 中包含多个路由节点,则调用 MultiNodeQueryHandler 的 execute 方法;若 RouteResultset 只包含单个路由节点,则调用 SingleNodeHandler 的 execute 方法。此处我们假设是单个路由节点
public class NonBlockingSession implements Session {
@Override
public void execute(RouteResultset routeResultset, int type) {
RouteResultSetNode[] nodes = routeResultset.getNodes();
if (nodes.length == 1) {
// 实例化一个 SingleNodeHandler 对象
singleNodeHandler = new SingleNodeHandler(routeResultset, this);
singleNodeHandler.execute();
}
}
}
SingleNodeHandler#execute 首先通过 session 获取客户端连接 ServerConnection 以及后端数据库连接 BackendConnection。第一次获取 BackendConnection 时由于 session 还没有将 routeResultSetNode 和 BackendConnection 绑定,故 backendConnection 返回 null,SingleNodeHandler#execute 要调用 PhysicalDBNode#getConnection 创建一个新的数据库连接,并将其绑定到 session 中
public class SingleNodeHandler implements ResponseHandler {
public void execute() throws Exception {
// 通过 session 拿到客户端连接 ServerConnection
ServerConnection serverConnection = session.getServerConnection();
// 通过 session 拿到后端数据库连接
final BackendConnection backendConnection = session.getTarget(routeResultSetNode);
// 若存在 routeResultsetNode 对应的 BackendConnection
if (session.tryExistsCon(backendConnection, routeResultSetNode)) {
_execute(backendConnection);
} else { // 若不存在 routeResultsetNode 对应的 BackendConnection,则创建新的连接
MycatConfig conf = MycatServer.getInstance().getConfig();
PhysicalDBNode dn = conf.getDataNodes().get(routeResultSetNode.getName());
dn.getConnection(dn.getDatabase(), serverConnection.isAutocommit(), routeResultSetNode, this, routeResultSetNode);
}
}
}
PhysicalDBNode#getConnection 从分片节点 dataNode 的数据库连接池中获取一个可写的 PhysicalDatasource,并调用 PhysicalDatasource#getConnection 从 ConMap 中获取一个可用的数据库连接
public class PhysicalDBNode {
public void getConnection(String schema, boolean autoCommit, RouteResultSetNode routeResultSetNode, ResponseHandler handler, Object attachment) throws Exception {
// 从分片节点 dataNode 的数据库连接池中获取一个可写的 PhysicalDatasource
PhysicalDatasource writeSource = dbPool.getSource();
writeSource.getConnection(schema, autoCommit, handler, attachment);
}
}
PhysicalDatasource#getConnection 从 ConMap 中获取一个可用的数据库连接后,调用 PhysicalDatasource#takeCon 将获取的 con 标记为已用(borrowed = true)
public class PhysicalDatasource {
public void getConnection(String schema, boolean autocommit, final ResponseHandler handler, final Object attachment)
throws IOException {
// 从 conMap 中拿取已建立好的后端连接
BackendConnection con = this.conMap.tryTakeCon(schema, autocommit);
if (con != null) {
takeCon(con, handler, attachment, schema);
}
}
private BackendConnection takeCon(BackendConnection backendConnection, final ResponseHandler handler, final Object attachment, String schema) {
// 标记该连接为已用
backendConnection.setBorrowed(true);
handler.connectionAcquired(backendConnection);
return backendConnection;
}
}
之后调用 ResponseHandler#connectionAcquired 进行连接获取后的确认逻辑,此处调用的实际实现类为 SingleNodeHandler,其在创建数据库连接时将自己作为 ResponseHandler 传入 PhysicalDBNode#getConnection 中
dn.getConnection(dn.getDatabase(), serverConnection.isAutocommit(), routeResultSetNode, this, routeResultSetNode);
SingleNodeHandler 本身实现了 ResponseHandler 接口,并实现了 connectionAcquired 方法,具体代码如下:
public class SingleNodeHandler implements ResponseHandler {
public void connectionAcquired(final BackendConnection backendConnection) {
// 将 routeResultsetNode 对应的后端连接记录在 session 的 backendConnectionMap 中
session.bindConnection(routeResultSetNode, backendConnection);
_execute(backendConnection);
}
}
因此,PhysicalDatasource#getConnection 从 ConMap 中获取一个可用的数据库连接后,首先将该连接标记为已用(borrowed = true),然后将 backendConnection 和对应的路由节点 node 绑定到 session 的 backendConnectionMap 中,最后调用 _execute(backendConnection)
最进一步执行请求处理,具体客户端 SQL 请求执行逻辑参见:三、客户端 SQL 请求执行流程
5.3.3 总结
当客户端发送 SQL 请求至 Mycat 时,Mycat 首先在 NonBlockingSession 的 ConcurrentMap<RouteResultSetNode, BackendConnection> backendConnectionMap
中查找是否存在 SQL 的路由节点 RouteResultSetNode 对应的 BackendConnection,若存在则返回,继续执行后续操作;若不存在,则从 PhysicalDatasource 的 ConMap 中获取一个可用的数据库连接 BackendConnection,并将其标记为已用(BackendConnection 的 borrowed 属性设置为 true),然后将 BackendConnection 和 RouteResultSetNode 注册于 NonBlockingSession 的 backendConnectionMap 中
5.4 释放已用的数据库连接
5.5 关闭数据库连接
附录
附录1 客户端命令请求报文命令类型详情表
类型值 | 命令 | 功能 | 关联函数 |
---|---|---|---|
0x00 | COM_SLEEP | (内部线程状态) | (无) |
0x01 | COM_QUIT | 关闭连接 | mysql_close |
0x02 | COM_INIT_DB | 切换数据库 | mysql_select_db |
0x03 | COM_QUERY | SQL查询请求 | mysql_real_query |
0x04 | COM_FIELD_LIST | 获取数据表字段信息 | mysql_list_fields |
0x05 | COM_CREATE_DB | 创建数据库 | mysql_create_db |
0x06 | COM_DROP_DB | 删除数据库 | mysql_drop_db |
0x07 | COM_REFRESH | 清除缓存 | mysql_refresh |
0x08 | COM_SHUTDOWN | 停止服务器 | mysql_shutdown |
0x09 | COM_STATISTICS | 获取服务器统计信息 | mysql_stat |
0x0A | COM_PROCESS_INFO | 获取当前连接的列表 | mysql_list_processes |
0x0B | COM_CONNECT | (内部线程状态) | (无) |
0x0C | COM_PROCESS_KILL | 中断某个连接 | mysql_kill |
0x0D | COM_DEBUG | 保存服务器调试信息 | mysql_dump_debug_info |
0x0E | COM_PING | 测试连通性 | mysql_ping |
0x0F | COM_TIME | (内部线程状态) | (无) |
0x10 | COM_DELAYED_INSERT | (内部线程状态) | (无) |
0x11 | COM_CHANGE_USER | 重新登陆(不断连接) | mysql_change_user |
0x12 | COM_BINLOG_DUMP | 获取二进制日志信息 | (无) |
0x13 | COM_TABLE_DUMP | 获取数据表结构信息 | (无) |
0x14 | COM_CONNECT_OUT | (内部线程状态) | (无) |
0x15 | COM_REGISTER_SLAVE | 从服务器向主服务器进行注册 | (无) |
0x16 | COM_STMT_PREPARE | 预处理SQL语句 | mysql_stmt_prepare |
0x17 | COM_STMT_EXECUTE | 执行预处理语句 | mysql_stmt_execute |
0x18 | COM_STMT_SEND_LONG_DATA | 发送BLOB类型的数据 | mysql_stmt_send_long_data |
0x19 | COM_STMT_CLOSE | 销毁预处理语句 | mysql_stmt_close |
0x1A | COM_STMT_RESET | 清除预处理语句参数缓存 | mysql_stmt_reset |
0x1B | COM_SET_OPTION | 设置语句选项 | mysql_set_server_option |
0x1C | COM_STMT_FETCH | 获取预处理语句的执行结果 | mysql_stmt_fetch |