title: 归还连接池
date: 2022-05-17 09:17
连接什么时候归还
当逻辑走到 connection 的 close
方法时,归还连接.Druid
实现了 Connection
方法,并且在此之前封装了自己的 recycle
方法
void close() throws SQLException
DruidPooledConnection 的 recycle 方法
DruidPooledConnection
类中,封装了 recycle
方法,实际最终调用的是DruidDataSource
的 recycle
方法.再通过DruidDataSource
的recycle
方法调用Conection
的 close
方法
DruidPooledConnection,DruidDataSource,Conection的关系
Druid回收连接到连接池简要类图.jpg
作用
- 职责剥离,最底层的
Conection
方法负责与数据库物理连接的关闭工作. -
DruidDataSource
的recycle
方法作为Conection
的上层.对自己封装的DruidConnectionHolder
进行清理,清理后统计数值的变化,以及归还(testOnReturn
)时的校验,DruidDataSource
的recycle
方法只暴露给DruidPooledConnection
调用. -
DruidPooledConnection
又在DruidDataSource
只上封装了一层,对外提供方法供应用层显示调用,做一些前置校验判断,最终调用DruidDataSource
的recycle
方法
DruidPooledConnection 的 recycle 源码分析
/**
* 可以被显示调用 conn.recycle();
* @throws SQLException
*/
public void recycle() throws SQLException {
// default disable = false
if (this.disable) {
return;
}
DruidConnectionHolder holder = this.holder;
if (holder == null) {
// 是否重复关闭日志开启状态 走到这里说明holder已经没了,不需要回收了.都没了~
if (dupCloseLogEnable) {
LOG.error("dup close");
}
return;
}
// 判断这个连接是不是被遗弃的,如果被遗弃了也不用回收
if (!this.abandoned) {
// 获取这个连接对应的数据源.因为多数据源有可能共享一个连接池
DruidAbstractDataSource dataSource = holder.getDataSource();
// 最终调用的是DruidDataSource的recycle方法
dataSource.recycle(this);
}
// 如果 !this.abandoned =false 那么就不会回收
this.holder = null;
conn = null;
// 这个链接当前的事务信息
transactionInfo = null;
closed = true;
}
DruidAbstractDataSource的recycle抽象方法
该父类抽象了回收方法的定义,可扩展让不同具现化的数据源实现
// DriudDataSource继承了父类DruidAbstractDataSource的抽象方法,子类具体实现了该方法
protected abstract void recycle(DruidPooledConnection pooledConnection)
DruidDataSource 的 recycle 具体实现
/**
* 回收连接
*/
protected void recycle(DruidPooledConnection pooledConnection) throws SQLException {
// 获取连接池中的holder
final DruidConnectionHolder holder = pooledConnection.holder;
if (holder == null) {
LOG.warn("connectionHolder is null");
return;
}
if (logDifferentThread //
// 是否开启了异步回收 默认false
&& (!isAsyncCloseConnectionEnable()) //
// 如果当前线程和创建连接的线程不是一个
&& pooledConnection.ownerThread != Thread.currentThread()//
) {
LOG.warn("get/close not same thread");
}
// 获取物理连接
final Connection physicalConnection = holder.conn;
// pooledConnection.traceEnable与activeConnections都是与removeAbandoned机制相关的参数
if (pooledConnection.traceEnable) {
Object oldInfo = null;
activeConnectionLock.lock();
try {
if (pooledConnection.traceEnable) {
oldInfo = activeConnections.remove(pooledConnection);
pooledConnection.traceEnable = false;
}
} finally {
activeConnectionLock.unlock();
}
if (oldInfo == null) {
if (LOG.isWarnEnabled()) {
LOG.warn("remove abandonded failed. activeConnections.size " + activeConnections.size());
}
}
}
// 是否自动提交
final boolean isAutoCommit = holder.underlyingAutoCommit;
// 是不是只读,影响事务处理
final boolean isReadOnly = holder.underlyingReadOnly;
// 是否归还时验证
final boolean testOnReturn = this.testOnReturn;
try {
// check need to rollback?
// 如果不是自动提交 并且 是写操作
if ((!isAutoCommit) && (!isReadOnly)) {
// 那么就回滚..这说明回滚操作也会走到这
pooledConnection.rollback();
}
// reset holder, restore default settings, clear warnings
boolean isSameThread = pooledConnection.ownerThread == Thread.currentThread();
// todo 如果不是同一个线程,reset,为啥会是不通线程?不通线程要上锁?
if (!isSameThread) {
final ReentrantLock lock = pooledConnection.lock;
lock.lock();
try {
holder.reset();
} finally {
lock.unlock();
}
} else {
// 如果是同一个线程 也要充值
holder.reset();
}
// 如果是丢弃 那么不做回收
if (holder.discard) {
return;
}
//
if (phyMaxUseCount > 0 && holder.useCount >= phyMaxUseCount) {
discardConnection(holder);
return;
}
// 如果已经关闭了物理连接 相关的统计数扣减
if (physicalConnection.isClosed()) {
lock.lock();
try {
if (holder.active) {
// 活动线程--
activeCount--;
// 活动状态
holder.active = false;
}
// 关闭数量++
closeCount++;
} finally {
lock.unlock();
}
return;
}
// 如果开启了归还验证
if (testOnReturn) {
boolean validate = testConnectionInternal(holder, physicalConnection);
// 验证不通过销毁这个连接
if (!validate) {
// 直接关闭
JdbcUtils.close(physicalConnection);
// 销毁数++
destroyCountUpdater.incrementAndGet(this);
// 锁统计线程 扣减
lock.lock();
try {
if (holder.active) {
activeCount--;
holder.active = false;
}
closeCount++;
} finally {
lock.unlock();
}
return;
}
}
// String类型initSchema 判断不严谨
if (holder.initSchema != null) {
holder.conn.setSchema(holder.initSchema);
holder.initSchema = null;
}
if (!enable) {
// 丢弃这个连接 里面jdbc close,close成功把这个holder的discard=true,避免对一个连接的重复关闭
discardConnection(holder);
return;
}
boolean result;
final long currentTimeMillis = System.currentTimeMillis();
// phyTimeoutMillis 物理连接超时配置,默认-1,是毫秒数
if (phyTimeoutMillis > 0) {
// 当前时间 - 连接时间 大于了 配置的物理连接超时时间,回收
long phyConnectTimeMillis = currentTimeMillis - holder.connectTimeMillis;
if (phyConnectTimeMillis > phyTimeoutMillis) {
discardConnection(holder);
return;
}
}
// 统计操作都要锁,为了准确性
lock.lock();
try {
if (holder.active) {
activeCount--;
holder.active = false;
}
closeCount++;
result = putLast(holder, currentTimeMillis);
recycleCount++;
} finally {
lock.unlock();
}
if (!result) {
JdbcUtils.close(holder.conn);
LOG.info("connection recyle failed.");
}
} catch (Throwable e) {
holder.clearStatementCache();
if (!holder.discard) {
discardConnection(holder);
holder.discard = true;
}
LOG.error("recyle error", e);
recycleErrorCountUpdater.incrementAndGet(this);
}
}
/**
* 关闭链接
* @param holder
*/
public void discardConnection(DruidConnectionHolder holder) {
if (holder == null) {
return;
}
// 获取 connection对象
Connection conn = holder.getConnection();
if (conn != null) {
JdbcUtils.close(conn);
}
// 上锁开始修改统计值
lock.lock();
try {
// 避免重复关闭报错
if (holder.discard) {
return;
}
if (holder.active) {
// 活跃数--
activeCount--;
// 该holder的活跃状态改成false
holder.active = false;
}
// 回收数++
discardCount++;
holder.discard = true;
// 如果活跃数小于了最小活跃数,就唤醒生产者允许继续创建线程直到minIdle
if (activeCount <= minIdle) {
emptySignal();
}
} finally {
lock.unlock();
}
}