MyCat 为了最高效的利用后端的 MySQL 连接,采取了不同于 Cobar 也不同于传统 JDBC 连接池的做法,传统的做法是基于 Database 的连接池,即一个 MySQL 服务器上有 5 个 Database,则每个 Database 独占最大200 个连接。这种模式的最大问题在于,将一个数据库所具备的最大 1000 个连接,隔离成了更新小的连接池,于是可能产生一个应用的连接不够,但其他应用的连接却很空闲的资源浪费情况,而对于分片这种场景,这个缺陷则几乎是致命的,因为每个分片所对应的 Database 的连接数量被限制在了一个很小的范围内,从而导致系统并发能力的大幅降低。而 Mycat 则采用了基于 MySQL 实例的连接池模式,每个 Database 都可以用现有的 1000 个连接中的空闲连接
5.1 核心对象
5.1.1 ConMap 和 ConQueue
在 MyCat 的连接池里,当前可用的、空闲的 MySQL 连接是放到一个 ConcurrentHashMap
的数据结构里,Key 为当前连接对应的 database 名,另外还有二级分类 ConQueue
,按照连接是自动提交模式还是手动提交模式进行区分,这个设计是为了高效的查询匹配的可用连接。ConMap
和 ConQueue
包含的关键对象有:
-
ConcurrentHashMap<String, ConQueue> items
:可用的 MySQL 连接容器,key 为当前连接对应的 database 名,value 为ConQueue
对象,里面包含了两个存储数据库连接的队列 -
ConcurrentLinkedQueue<BackendConnection> autoCommitCons
:自动提交的数据库连接 -
ConcurrentLinkedQueue<BackendConnection> manCommitCons
:手动提交的数据库连接
public class ConMap {
/**
* key:当前连接对应的 Database
* ConQueue:数据库连接队列(按照连接是自动提交模式还是手动提交模式进行区分,这个设计是为了高效的查询匹配的可用连接)
*/
private final ConcurrentHashMap<String, ConQueue> items = new ConcurrentHashMap<String, ConQueue>();
}
public class ConQueue {
private final ConcurrentLinkedQueue<BackendConnection> autoCommitCons = new ConcurrentLinkedQueue<BackendConnection>();
private final ConcurrentLinkedQueue<BackendConnection> manCommitCons = new ConcurrentLinkedQueue<BackendConnection>();
private long executeCount;
}
BackendConnection
为后端数据库连接,其实现有JDBCConnection
、MySQLConnection
等
5.1.2 PhysicalDatasource
对应于 <dataHost>
节点下的 <writeHost>
或 <readHost>
子节点,表示一个物理数据库实例。每个数据库实例中保存了多个可用的数据库连接(BackendConnection
),MyCat 初始化时,根据 <dataHost>
节点的 minCon
属性值初始化多个可用的数据库连接。其关键对象有:
-
name
:<writeHost>
的 host 属性值 -
size
:读或写连接池的最大连接数 -
conMap
:存放当前可用的数据库连接 -
DataHostConfig
:<dataHost>
节点对应的配置 -
DBHostConfig
:<writeHost>
节点配置
public abstract class PhysicalDatasource {
private final String name;
private final int size;
private final DBHostConfig config;
private final ConMap conMap = new ConMap();
private final boolean readNode;
private final DataHostConfig hostConfig;
private PhysicalDBPool dbPool;
}
PhysicalDatasource
的实现类有:
5.1.3 PhysicalDBPool
对应于 <dataHost name="localhost1" >
节点,表示物理数据库实例池。由于 <datahost>
节点可包含多个 <writeHost>
节点,因此 PhysicalDBPool
可以包含多个物理数据库实例,其关键对象有:
-
hostName
:<dataHost>
标签的 name 属性 -
writeSources
和readSources
:可写和可读的多个物理数据库实例,对应于<writeHost>
和<readHost>
-
activedIndex
:表明了当前是哪个写节点的数据源在生效
public class PhysicalDBPool {
private final String hostName;
protected PhysicalDatasource[] writeSources;
protected Map<Integer, PhysicalDatasource[]> readSources;
protected volatile int activedIndex;
private final DataHostConfig dataHostConfig;
}
5.1.4 PhysicalDBNode
对应于 <dataNode />
节点,表示一个数据库分片,PhysicalDBNode
包含的关键对象有:
-
name
:dataNode 名称,对应于<dataNode>
标签的 name 属性 -
database
:数据库名称,对应于<dataNode>
标签的 database 属性 -
dbPool
:MySQL 连接池,里面包含了多个数据库实例PhysicalDatasource
,并将其按照读节点和写节点分类,实现读写分类和节点切换的功能。其中activedIndex
属性表明了当前是哪个写节点的数据源在生效
public class PhysicalDBNode {
protected final String name;
protected final String database;
protected final PhysicalDBPool dbPool;
}
若 schema.xml 中配置了一下分片节点:
<dataNode name="dn1" dataHost="localhost1" database="db_demo_01"/>
<dataNode name="dn2" dataHost="localhost1" database="db_demo_02"/>
<dataNode name="dn3" dataHost="localhost1" database="db_demo_03"/>
当某个用户会话需要一个自动提交的,到分片 <dataNode name="dn1" dataHost="localhost1" database="db_demo_01"/>
的 SQL 连接的时候,分片节点 dn1
首先在连接池 dbPool
中查找是否有数据库 db_demo_01
(对应于 PhysicalDatasource
)上的可用连接,若有则看是否有自动提交模式的连接,找到就返回,否则返回 db_demo_01
上的手动提交模式的连接;若没有 db_demo_01
的可用连接,则随机返回一个其他数据库(db_demo_02
或 db_demo_03
)对应的可用连接;若没有其他数据库也没有可用连接,并且连接池还没达到上限,则创建一个新连接并返回
上述获取数据库连接的逻辑有一种情况是:用户会话得到的数据库连接可能不是来自于 db_demo_01
的,因此在执行具体的 SQL 之前,还有一个自动同步数据库连接的过程:包括事务隔离级别、事务模式、字符集、database 等四个指标。同步完成以后,才会执行具体的 SQL 请求
通过共享一个 MySQL 上的所有数据库的可用连接,并结合连接状态同步的特性,MyCat 的连接池做到了最佳的吞吐量,也在一定程度上提升了整个系统的并发支撑能力
5.2 创建数据库连接
5.2.1 创建新数据库连接时机
创建新数据库连接的方法为 PhysicalDatasource#createNewConnection(io.mycat.backend.mysql.nio.handler.ResponseHandler, java.lang.Object, java.lang.String)
,其有两个创建连接的触发时机:
-
io.mycat.backend.datasource.PhysicalDatasource#createByIdleLitte
执行空闲检测时触发,若当前数据库连接总数(空闲连接数和活动链接数之和)小于连接池的最大连接数,且空闲连接数小于连接池最小连接数,则调用
PhysicalDatasource#createByIdleLitte
方法创建新数据库连接if ((createCount > 0) && (idleCons + activeCons < size) && (idleCons < hostConfig.getMinCon())) { createByIdleLitte(idleCons, createCount); }
-
io.mycat.backend.datasource.PhysicalDatasource#getConnection
首先调用
ConMap#tryTakeCon(java.lang.String, boolean)
获取当前 database 的可用连接,若有则立即返回,否则从其他的 database 上找一个可用连接返回。若ConMap#tryTakeCon
返回 null,表示数据库连接池中没有空闲连接,则调用PhysicalDatasource#createNewConnection
创建新连接public void getConnection(String schema, boolean autocommit, final ResponseHandler handler, final Object attachment) throws IOException { // 从当前连接 map 中拿取已建立好的后端连接 BackendConnection con = this.conMap.tryTakeCon(schema, autocommit); if (con != null) { //如果不为空,则绑定对应前端请求的 handler takeCon(con, handler, attachment, schema); } else { long activeCons = increamentCount.longValue() + totalConnectionCount; if (activeCons < size) { createNewConnection(handler, attachment, schema); } else { LOGGER.error("the max activeConnnections size can not be max than maxconnections"); throw new IOException("the max activeConnnections size can not be max than maxconnections"); } } }
5.2.2 创建新数据库连接
MycatServer#startup
方法里其中一件事情就是初始化 PhysicalDBPool
public void startup() throws IOException {
...
Map<String, PhysicalDBPool> dataHosts = config.getDataHosts();
for (PhysicalDBPool physicalDBPool : dataHosts.values()) {
String index = dnIndexProperties.getProperty(physicalDBPool.getHostName(), "0");
physicalDBPool.init(Integer.parseInt(index));
physicalDBPool.startHeartbeat();
}
...
}
physicalDBPool.init(Integer.parseInt(index))
中 调用 PhysicalDBPool#initSource
方法,该方法对每一个 PhysicalDatasource
调用 getConnection
方法创建新的数据库连接
public void init(int index, String reason) {
for (int i = 0; i < writeSources.length; i++) {
int j = loop(i + index);
initSource(j, writeSources[j])
}
}
private boolean initSource(int index, PhysicalDatasource physicalDatasource) {
int initSize = physicalDatasource.getConfig().getMinCon();
CopyOnWriteArrayList<BackendConnection> list = new CopyOnWriteArrayList<>();
GetConnectionHandler getConHandler = new GetConnectionHandler(list, initSize);
for (int i = 0; i < initSize; i++) {
try {
physicalDatasource.getConnection(this.schemas[i % schemas.length], true, getConHandler, null);
} catch (Exception e) {
LOGGER.warn(getMessage(index, " init connection error."), e);
}
}
...
}
PhysicalDatasource#createNewConnection
方法新建一个线程异步执行创建数据库连接的操作,每个线程通过调用抽象方法来进行具体的创建逻辑。
private void createNewConnection(final ResponseHandler handler, final Object attachment, final String schema) throws IOException {
// aysn create connection
final AtomicBoolean hasError = new AtomicBoolean(false);
MycatServer.getInstance().getBusinessExecutor().execute(new Runnable() {
@Override
public void run() {
try {
createNewConnection(new DelegateResponseHandler(handler) {
@Override
public void connectionError(Throwable e, BackendConnection conn) {
if (hasError.compareAndSet(false, true)) {
handler.connectionError(e, conn);
} else {
LOGGER.info("connection connectionError ");
}
}
@Override
public void connectionAcquired(BackendConnection conn) {
LOGGER.info("connection id is " + conn.getId());
takeCon(conn, handler, attachment, schema);
}
}, schema);
} catch (IOException e) {
if (hasError.compareAndSet(false, true)) {
handler.connectionError(e, null);
} else {
LOGGER.info("connection connectionError ");
}
}
}
});
}
每个继承 PhysicalDatasource
的数据源对象自己实现如下抽象方法:
public abstract void createNewConnection(ResponseHandler handler, String schema) throws IOException;
我们以 JDBCDatasource
为例,其实现的 createNewConnection
方法实现代码如下:
public class JDBCDatasource extends PhysicalDatasource {
@Override
public void createNewConnection(ResponseHandler handler, String schema) throws IOException {
DBHostConfig cfg = getConfig();
JDBCConnection jdbcConnection = new JDBCConnection();
jdbcConnection.setHost(cfg.getIp());
jdbcConnection.setPort(cfg.getPort());
jdbcConnection.setJdbcDatasource(this);
jdbcConnection.setSchema(schema);
jdbcConnection.setDbType(cfg.getDbType());
// 复用 mysql 的 Backend 的ID,需要在 process 中存储
jdbcConnection.setId(NIOConnector.ID_GENERATOR.getId());
NIOProcessor processor = MycatServer.getInstance().nextProcessor();
jdbcConnection.setProcessor(processor);
processor.addBackend(jdbcConnection);
try {
Connection con = getConnection();
jdbcConnection.setCon(con);
// notify handler
handler.connectionAcquired(jdbcConnection);
} catch (Exception e) {
handler.connectionError(e, jdbcConnection);
}
}
}
主要做了一下几件事:
实例化一个
JDBCConnection
,设置相关参数-
调用
JDBCDatasource#getConnection
获取Connection
JDBCDatasource#getConnection
直接使用DriverManager
创建一个新连接并返回public Connection getConnection() throws SQLException { DBHostConfig cfg = getConfig(); Connection connection = DriverManager.getConnection(cfg.getUrl(), cfg.getUser(), cfg.getPassword()); return connection; }
-
调用
DelegateResponseHandler#connectionAcquired
,作为已获得数据库连接的响应处理@Override public void connectionAcquired(BackendConnection conn) { LOGGER.info("connection id is " + conn.getId()); takeCon(conn, handler, attachment, schema); }
ResponseHandler#connectionAcquired
调用PhysicalDatasource#takeCon
方法进行相应处理,代码如下:private BackendConnection takeCon(BackendConnection conn, final ResponseHandler handler, final Object attachment, String schema) { // 设置数据库连接状态为已用 conn.setBorrowed(true); if (!conn.getSchema().equals(schema)) { // need do schema syn in before sql send conn.setSchema(schema); } // 获取 ConQueue,增加可以执行连接的数量 ConQueue queue = conMap.getSchemaConQueue(schema); queue.incExecuteCount(); conn.setAttachment(attachment); // 每次取连接的时候,更新下 lasttime,防止在前端连接检查的时候,关闭连接,导致sql执行失败 conn.setLastTime(System.currentTimeMillis()); handler.connectionAcquired(conn); return conn; }
主要做了一下几件事:
设置数据库连接状态为已用
获取
ConQueue
,增加可以执行连接的数量-
调用
ResponseHandler#connectionAcquired
,具体实现见GetConnectionHandler#connectionAcquired
,该方法调用BackendConnection#release
释放连接,调用PhysicalDatasource#returnCon
方法将释放的数据库连接放回ConMap
的ConQueue
中@Override public void ResponseHandler#connectionAcquired(BackendConnection conn) { successCons.add(conn); finishedCount.addAndGet(1); logger.info("connected successfully " + conn); conn.release(); } @Override public void JDBCConnection#release() { jdbcDatasource.releaseChannel(this); } public void PhysicalDatasource#releaseChannel(BackendConnection c) { returnCon(c); } private void PhysicalDatasource#returnCon(BackendConnection c) { c.setAttachment(null); c.setBorrowed(false); c.setLastTime(TimeUtil.currentTimeMillis()); ConQueue queue = this.conMap.getSchemaConQueue(c.getSchema()); boolean ok = false; if (c.isAutocommit()) { ok = queue.getAutoCommitCons().offer(c); } else { ok = queue.getManCommitCons().offer(c); } // 若无法放入 ConQueue 则将连接关闭 if (!ok) { LOGGER.warn("can't return to pool ,so close con " + c); c.close("can't return to pool "); } }
以 JDBC 方式创建数据库连接为例,流程如图所示:
5.2.3 总结
MyCat 服务启动时调用 MycatServer 的 startUp 方法对每一个 <dataHost>
节点的多个 <writeHost>
节点对应的数据源做初始化工作。初始创建数据库连接数由 <dataHost>
节点的 minCon
属性值决定。每创建一个 BackendConnection
便回调 GetConnectionHandler#connectionAcquired
将新生成的 connection
的 borrowed
属性设置为 false
(该属性个人理解是标记数据库连接是否空闲),然后将 connection
保存于 ConQueue
中
因此一个 <dataHost>
节点对应一个 PhysicalDBPool
,PhysicalDBPool
类中的 PhysicalDatasource[] writeSources
对应于 <dataHost>
节点下多个 <writeHost>
节点。每个 PhysicalDatasource
中持有一个 ConMap conMap
作为数据源的连接池, 里面存放着可用的数据库连接 BackendConnection
MyCat 根据 <dataNode>
节点的 dataHost
属性和 database
属性,将数据库连接均匀得分配给在同一个 dataHost 中的不同数据库。例如对于以下配置:
<!-- 分片节点 -->
<dataNode name="dn1" dataHost="localhost1" database="db_demo_01"/>
<dataNode name="dn2" dataHost="localhost1" database="db_demo_02"/>
<dataNode name="dn3" dataHost="localhost1" database="db_demo_03"/>
<!-- 节点主机 -->
<dataHost name="localhost1" maxCon="10000" minCon="100" balance="0" writeType="0" dbType="mysql" dbDriver="jdbc" switchType="1" slaveThreshold="100">
<heartbeat>select user()</heartbeat>
<writeHost host="hostM1" url="jdbc:mysql://localhost:3306?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&autoReconnect=true" user="root" password="root">
</writeHost>
</dataHost>
MyCat 会初始化 100 个(minCon="100")数据库连接,并将这 100 个连接均分给 db_demo_01、db_demo_02 和 db_demo_03,如下图所示
5.3 获取数据库连接
本节主要讲述获取可用的数据库连接用于执行客户端的 SQL 请求
5.3.1 涉及的核心类
-
NIOAcceptor
:负责处理 Accept 事件,即 MyCat 作为服务端去处理前端业务程序发过来的连接请求 -
ServerConnectionFactory
:客户端和 MyCat 连接工厂,用于创建客户端连接 -
ServerConnection
:客户端连接(客户端和 MyCat 之间的连接) -
NonBlockingSession
:客户端连接和后端数据库连接的会话,其核心对象有:-
ServerConnection serverConnection
:客户端连接 -
ConcurrentMap<RouteResultSetNode, BackendConnection> backendConnectionMap
:存放路由节点和对应的后端数据库连接的容器 -
SingleNodeHandler singleNodeHandler
:单路由节点请求处理器 -
MultiNodeQueryHandler multiNodeHandler
:多路由节点请求处理器
-
-
SingleNodeHandler
(MultiNodeQueryHandler):路由节点请求处理器,其核心对象有:RouteResultSetNode routeResultSetNode
RouteResultset rrs
NonBlockingSession session
PhysicalDBNode
PhysicalDatasource
5.3.2 获取可用数据库连接过程
NIOAcceptor
在接受到前端发来的连接请求后,会调用 ServerConnectionFactory
实例化一个 ServerConnection
,之后实例化一个 NonBlockingSession
注入到 ServerConnection
public class ServerConnectionFactory extends FrontendConnectionFactory {
@Override
protected FrontendConnection getConnection(NetworkChannel channel) throws IOException {
SystemConfig systemConfig = MycatServer.getInstance().getConfig().getSystem();
// 将 channel 包装为一个 ServerConnection
ServerConnection serverConnection = new ServerConnection(channel);
// 设置客户端查询处理器
serverConnection.setQueryHandler(new ServerQueryHandler(serverConnection));
// 设置客户端和 MyCat 的一个会话 session
serverConnection.setSession2(new NonBlockingSession(serverConnection));
...
return serverConnection;
}
}
前端 SQL 请求进来之后,MyCat 调用 ServerConnection#routeEndExecuteSQL
进行路由计算并得到路由结果 RouteResultset
,然后调用 NonBlockingSession#execute
进行处理,若 RouteResultset
中包含多个路由节点,则调用 MultiNodeQueryHandler#execute
方法;若 RouteResultset
只包含单个路由节点,则调用 SingleNodeHandler#execute
方法。此处我们假设是单个路由节点
public class NonBlockingSession implements Session {
@Override
public void execute(RouteResultset routeResultset, int type) {
RouteResultSetNode[] nodes = routeResultset.getNodes();
if (nodes.length == 1) {
// 实例化一个 SingleNodeHandler 对象
singleNodeHandler = new SingleNodeHandler(routeResultset, this);
singleNodeHandler.execute();
} else {
multiNodeHandler = new MultiNodeQueryHandler(type, routeResultset, autocommit, this);
multiNodeHandler.execute();
}
}
}
SingleNodeHandler#execute
首先通过 session
获取客户端连接 ServerConnection
以及后端数据库连接 BackendConnection
。第一次获取 BackendConnection
时由于 session
还没有将 routeResultSetNode
和 BackendConnection
绑定,故 backendConnection
返回 null,SingleNodeHandler#execute
要调用 PhysicalDBNode#getConnection
创建一个新的数据库连接,并将其绑定到 session
中
public class SingleNodeHandler implements ResponseHandler {
public void execute() throws Exception {
// 通过 session 拿到客户端连接 ServerConnection
ServerConnection serverConnection = session.getServerConnection();
// 通过 session 拿到后端数据库连接
final BackendConnection backendConnection = session.getTarget(routeResultSetNode);
// 若存在 routeResultsetNode 对应的 BackendConnection
if (session.tryExistsCon(backendConnection, routeResultSetNode)) {
_execute(backendConnection);
} else { // 若不存在 routeResultsetNode 对应的 BackendConnection,则创建新的连接
MycatConfig conf = MycatServer.getInstance().getConfig();
PhysicalDBNode dn = conf.getDataNodes().get(routeResultSetNode.getName());
dn.getConnection(dn.getDatabase(), serverConnection.isAutocommit(), routeResultSetNode, this, routeResultSetNode);
}
}
}
PhysicalDBNode#getConnection
从分片节点 dataNode
的数据库连接池中获取一个可写的 PhysicalDatasource
,并调用 PhysicalDatasource#getConnection
从 ConMap 中获取一个可用的数据库连接
public class PhysicalDBNode {
public void getConnection(String schema, boolean autoCommit, RouteResultSetNode routeResultSetNode, ResponseHandler handler, Object attachment) throws Exception {
// 从分片节点 dataNode 的数据库连接池中获取一个可写的 PhysicalDatasource
PhysicalDatasource writeSource = dbPool.getSource();
writeSource.getConnection(schema, autoCommit, handler, attachment);
}
}
PhysicalDatasource#getConnection
从 ConMap
中获取一个可用的数据库连接后,调用 PhysicalDatasource#takeCon
将获取的 connection
标记为已用(borrowed = true)
public class PhysicalDatasource {
public void getConnection(String schema, boolean autocommit, final ResponseHandler handler, final Object attachment)
throws IOException {
// 从 conMap 中拿取已建立好的后端连接
BackendConnection connection = this.conMap.tryTakeCon(schema, autocommit);
if (connection != null) {
takeCon(connection, handler, attachment, schema);
}
}
private BackendConnection takeCon(BackendConnection backendConnection, final ResponseHandler handler, final Object attachment, String schema) {
// 标记该连接为已用
backendConnection.setBorrowed(true);
handler.connectionAcquired(backendConnection);
return backendConnection;
}
}
之后调用 ResponseHandler#connectionAcquired
进行数据库连接获取后的确认逻辑,此处调用的实际实现类为 SingleNodeHandler
,其在创建数据库连接时将自己作为 ResponseHandler
传入 PhysicalDBNode#getConnection
中
dn.getConnection(dn.getDatabase(), serverConnection.isAutocommit(), routeResultSetNode, this, routeResultSetNode);
SingleNodeHandler
本身实现了 ResponseHandler
接口,并实现了 connectionAcquired
方法,具体代码如下:
public class SingleNodeHandler implements ResponseHandler {
public void connectionAcquired(final BackendConnection backendConnection) {
// 实现 session 和后端数据库连接 backendConnection 的绑定
// 将 routeResultsetNode 对应的后端连接记录在 session 的 backendConnectionMap 中
session.bindConnection(routeResultSetNode, backendConnection);
_execute(backendConnection);
}
}
因此,PhysicalDatasource#getConnection
从 ConMap
中获取一个可用的数据库连接后,首先将该连接标记为已用(borrowed = true),然后将 BackendConnection
和对应的路由节点 RouteResultSetNode
绑定到 NonBlockingSession
的 backendConnectionMap
中,最后调用 _execute(backendConnection)
进行 SQL 请求处理,具体客户端 SQL 请求执行逻辑参见:三、客户端 SQL 请求执行流程
5.3.3 总结
当客户端发送 SQL 请求至 MyCat 时,MyCat 首先在 NonBlockingSession
的 ConcurrentMap<RouteResultSetNode, BackendConnection> backendConnectionMap
中查找请求 SQL 路由节点 RouteResultSetNode
对应的 BackendConnection
是否存在,若存在则返回,继续执行后续操作;若不存在,则从 PhysicalDatasource
的 ConMap
中获取一个可用的数据库连接 BackendConnection
,并将其标记为已用(BackendConnection
的 borrowed
属性设置为 true
),然后将 BackendConnection
和 RouteResultSetNode
注册于 NonBlockingSession
的 backendConnectionMap
中
5.4 释放已用的数据库连接
todo
5.5 关闭数据库连接
todo