[druid 源码解析] 4 获取连接

我们回头看DataSource的接口,它里面只定义了两个方法,如下,我们今天来分析第一个也是最重要的方法 getConnection:

public interface DataSource  extends CommonDataSource, Wrapper {
  Connection getConnection() throws SQLException;

  Connection getConnection(String username, String password)
    throws SQLException;
}

1.1 具体实现

我们先看一下 DruidDatasource 的具体实现:

    @Override
    public DruidPooledConnection getConnection() throws SQLException {
        return getConnection(maxWait);
    }

    public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
        init();

        if (filters.size() > 0) {
            FilterChainImpl filterChain = new FilterChainImpl(this);
            // 遍历所有 Filter 
            return filterChain.dataSource_connect(this, maxWaitMillis);
        } else {
            return getConnectionDirect(maxWaitMillis);
        }
    }

这里先调用了 init() 方法,来初始化,这个流程我们上面已经提到,假如已经初始化完成就会直接返回,接下来遍历所有的filter,这里是一种责任链模式,FilterChainImpl负责遍历所有的 Filter,主要流程是 FilterChainImpl先判断,当前filter的位置是不是最后的 ,假如是,就调用实际需要执行的方法,假如不是,就获取下一个 filter,并将自己传给 filterfilter在处理的时候是先调用 FilterChainImpl来获取实际的结果,最后自己才对结果进行处理,有点像入栈出栈流程。

1.2 责任链模式

一开始看责任链模式会有点绕,所以我直接写了个简单的例子来模拟这个流程,首先我们有两个接口,一个是 Filter 一个是 FilterChain:

public interface Filter {
    public int filter(FilterChain chain);
}
public interface FilterChain {
    public int doFilter();
}

接着我们做 FilterChain 的实现类, 这里的关键就是他需要持有 filter 的链,然后自己定义具体链的位置,最后最重要的是这个判断。(这里可以先忽略构造方法)

public class FilterChainImpl implements FilterChain {

    List<Filter> filters;

    int pos;

    public FilterChainImpl() {
        filters = new ArrayList<>();
        filters.add(new CFilter());
        filters.add(new BFilter());
        filters.add(new AFilter());
        pos = 0;
    }

    @Override
    public int doFilter() {
        if (pos < filters.size()) {
            getNexFilter().filter(this);
        }
        return 1;
    }

    public Filter getNexFilter() {
        return filters.get(pos++);
    }
}

接着我们简答地实现一个 Filter,最关键的逻辑是需要先调用 chain 去获取结果,在对结果进行处理:

public class AFilter implements Filter {
    @Override
    public int filter(FilterChain chain) {
        int result = chain.doFilter();
        System.out.println("AFilter filter " + result);
        return 0;
    }
}

最终我们的输出如下:


image.png

内部获取链接

有了上述的例子,我们其实最后调用的就是 FilterChainImplgetConnection 方法的最后一行即可,即调用了 getConnectionDirect 方法。

public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
        int notFullTimeoutRetryCnt = 0;
        for (; ; ) {
            // handle notFullTimeoutRetry
            DruidPooledConnection poolableConnection;
            try {
                // 真正去获取 connection
                poolableConnection = getConnectionInternal(maxWaitMillis);
            } catch (GetConnectionTimeoutException ex) {
                if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) {
                    notFullTimeoutRetryCnt++;
                    if (LOG.isWarnEnabled()) {
                        LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt);
                    }
                    continue;
                }
                throw ex;
            }

            if (testOnBorrow) {
                // 测试 connection 是否可用
                boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
                if (!validate) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("skip not validate connection.");
                    }
                    // 假如不可用就断开链接
                    discardConnection(poolableConnection.holder);
                    continue;
                }
            } else {
              // 对链接进行校验
                .....
            }
          // 产看是否需要检查活动线程,假如需要就放到  activeConnections 集合中。
            if (removeAbandoned) {
                StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
                poolableConnection.connectStackTrace = stackTrace;
                poolableConnection.setConnectedTimeNano();
                poolableConnection.traceEnable = true;

                activeConnectionLock.lock();
                try {
                    activeConnections.put(poolableConnection, PRESENT);
                } finally {
                    activeConnectionLock.unlock();
                }
            }

            if (!this.defaultAutoCommit) {
                poolableConnection.setAutoCommit(false);
            }

            return poolableConnection;
        }
    }

流程如下:

  • 调用getConnectionInternal获取经过各种包装的Connection,这个是获取连接的主要逻辑,支持超时时间,由DruidDataSource的maxWait参数指定,单位毫秒。
  • 如果testOnBorrow为true,则进行对连接进行校验,校验失败则进行清理并重新进入循环,否则跳到下一步。
  • 如果testWhileIdle为true,距离上次激活时间超过timeBetweenEvictionRunsMillis,则进行清理。
  • 如果removeAbandoned为true,则会把连接存放在activeConnections中,清理线程会对其定期进行处理。
    接下来,我们看一下 getConnectionInternal 方法:
private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {
          ......
        final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait);
        final int maxWaitThreadCount = this.maxWaitThreadCount;

        DruidConnectionHolder holder;

        for (boolean createDirect = false; ; ) {
          // 每次都是重新创建模式,就执行下面逻辑。
            if (createDirect) {
             ........
            }

            try {
            // 获取锁
                lock.lockInterruptibly();
            } catch (InterruptedException e) {
                connectErrorCountUpdater.incrementAndGet(this);
                throw new SQLException("interrupt", e);
            }

            try {
              // 检查是否到达最大等待线程数量线程
                if (maxWaitThreadCount > 0
                    && notEmptyWaitThreadCount >= maxWaitThreadCount) {
                    connectErrorCountUpdater.incrementAndGet(this);
                    throw new ***
                }
              // 查看是否有报错,有就抛出去
               .....

                connectCount++;
// 检查创建的线程池是否已经不够了,不够就直接创建
                if (createScheduler != null
                    && poolingCount == 0
                    && activeCount < maxActive
                    && creatingCountUpdater.get(this) == 0
                    && createScheduler instanceof ScheduledThreadPoolExecutor) {
                    ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler;
                    if (executor.getQueue().size() > 0) {
                        createDirect = true;
                        continue;
                    }
                }
                // 这两个方法仅仅是有是否有超时时间决定。
                if (maxWait > 0) {
                    holder = pollLast(nanos);
                } else {
                    holder = takeLast();
                }

                if (holder != null) {
                    if (holder.discard) {
                        continue;
                    }

                    activeCount++;
                    holder.active = true;
                    if (activeCount > activePeak) {
                        activePeak = activeCount;
                        activePeakTime = System.currentTimeMillis();
                    }
                }
            } catch (InterruptedException e) {
                connectErrorCountUpdater.incrementAndGet(this);
                throw new SQLException(e.getMessage(), e);
            } catch (SQLException e) {
                connectErrorCountUpdater.incrementAndGet(this);
                throw e;
            } finally {
                lock.unlock();
            }

            break;
        }

        if (holder == null) {
                // 创建错误信息
                  ....
            }

            String errorMessage = buf.toString();

            if (createError != null) {
                throw new GetConnectionTimeoutException(errorMessage, createError);
            } else {
                throw new GetConnectionTimeoutException(errorMessage);
            }
        }

        holder.incrementUseCount();

        DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder);
        return poolalbeConnection;
    }
  • 检查创建的线程池是否已经不够了,不够就直接创建。
  • 调用 pollLast(nanos); 或者 takeLast(); 这两者仅仅是是否有超时时间的区别。

和创建连接线程协作

我们直接来分析 takeLast();

DruidConnectionHolder takeLast() throws InterruptedException, SQLException {
        try {
            while (poolingCount == 0) {
                // 发送信号通知创建链接线程去创建连接
                emptySignal(); // send signal to CreateThread create connection

                if (failFast && isFailContinuous()) {
                    throw new DataSourceNotAvailableException(createError);
                }
                // 将等待线程加一
                notEmptyWaitThreadCount++;
                if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {
                    notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
                }
                try {
                    // 等待创建好连接
                    notEmpty.await(); // signal by recycle or creator
                } finally {
                    notEmptyWaitThreadCount--;
                }
                notEmptyWaitCount++;

                if (!enable) {
                    connectErrorCountUpdater.incrementAndGet(this);
                    if (disableException != null) {
                        throw disableException;
                    }

                    throw new DataSourceDisableException();
                }
            }
        } catch (InterruptedException ie) {
            notEmpty.signal(); // propagate to non-interrupted thread
            notEmptySignalCount++;
            throw ie;
        }
        // 获取空闲连接数组 connections 的最后一个线程,并返回
        decrementPoolingCount();
        DruidConnectionHolder last = connections[poolingCount];
        connections[poolingCount] = null;

        return last;
    }

这里很多细节就和我们之前的对应上了,首先是发送信号,让创建线程创建线程池,然后判断等待线程首先是否当前等待线程大于阈值,是的话就抛错。然后调用 notEmpty.await() 等待创建线程的通知。
最后将 Connection 从活动线程借出来。

检查链接

我们回到上面,当拿connection 后需要检查链接是否存活,调用 testConnectionInternal 方法,最终调用 MySqlValidConnectionCheckerisValidConnection 方法。我们通过Debug 发现最终调用的就是 JDBC4MysqlpingInternal 方法, 如下:

check

到这里,我们完成了 getConnection 的工作。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,029评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,395评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,570评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,535评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,650评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,850评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,006评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,747评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,207评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,536评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,683评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,342评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,964评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,772评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,004评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,401评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,566评论 2 349

推荐阅读更多精彩内容