DruidCP源码阅读2 -- init过程

1、init过程

在getConnection获取连接时最开始执行init()方法,执行初始化操作

public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
    // 在初始化dataSource时并不会执行init,而是在第一次获取连接的时候进行init
    init();

    if (filters.size() > 0) {
        FilterChainImpl filterChain = new FilterChainImpl(this);
        return filterChain.dataSource_connect(this, maxWaitMillis);
    } else {
        return getConnectionDirect(maxWaitMillis);
    }
}

init采用Double Check机制防止重复执行,因为在线程释放锁的时候,初始化过程可能还未完成 ;
线程上锁lock.lockInterruptibly()

    public void init() throws SQLException {
        if (inited) {
            return;
        }

        // bug fixed for dead lock, for issue #2980
        DruidDriver.getInstance();

        final ReentrantLock lock = this.lock;
        try {
            lock.lockInterruptibly();
        } catch (InterruptedException e) {
            throw new SQLException("interrupt", e);
        }

        boolean init = false;
        try {
            // DCL
            if (inited) {
                return;
            }

            initStackTrace = Utils.toString(Thread.currentThread().getStackTrace());

            // 分配数据源id{dataSource-id}
            this.id = DruidDriver.createDataSourceId();
            if (this.id > 1) {
                long delta = (this.id - 1) * 100000;
                this.connectionIdSeedUpdater.addAndGet(this, delta);
                this.statementIdSeedUpdater.addAndGet(this, delta);
                this.resultSetIdSeedUpdater.addAndGet(this, delta);
                this.transactionIdSeedUpdater.addAndGet(this, delta);
            }

            if (this.jdbcUrl != null) {
                this.jdbcUrl = this.jdbcUrl.trim();
                // 如果jdbcUrl 以jdbc:wrap-jdbc,那么启动代理
                initFromWrapDriverUrl();
            }

            // 遍历所有配置的filter,并且执行它们的init方法
            for (Filter filter : filters) {
                filter.init(this);
            }

            // 根据jdbcUrl来判断是哪种数据源
            if (this.dbTypeName == null || this.dbTypeName.length() == 0) {
                this.dbTypeName = JdbcUtils.getDbType(jdbcUrl, null);
            }

        。。。

            // SPI 方式创建filter对象
            initFromSPIServiceLoader();

            // 根据配置的driver,配置驱动
            resolveDriver();

            // 对一些数据源进行特殊检查
            initCheck();

            // 根据不同的数据源分配不同的异常处理器
            initExceptionSorter();
            initValidConnectionChecker();
            validationQueryCheck();

            if (isUseGlobalDataSourceStat()) {
                dataSourceStat = JdbcDataSourceStat.getGlobal();
                if (dataSourceStat == null) {
                    dataSourceStat = new JdbcDataSourceStat("Global", "Global", this.dbTypeName);
                    JdbcDataSourceStat.setGlobal(dataSourceStat);
                }
                if (dataSourceStat.getDbType() == null) {
                    dataSourceStat.setDbType(this.dbTypeName);
                }
            } else {
                dataSourceStat = new JdbcDataSourceStat(this.name, this.jdbcUrl, this.dbTypeName, this.connectProperties);
            }
            dataSourceStat.setResetStatEnable(this.resetStatEnable);

            // 创建三个连接池
            // 可被获取的连接池
            connections = new DruidConnectionHolder[maxActive];
            // 待清理的连接池
            evictConnections = new DruidConnectionHolder[maxActive];
            // 存活的连接池
            keepAliveConnections = new DruidConnectionHolder[maxActive];

            SQLException connectError = null;

            if (createScheduler != null && asyncInit) {
                for (int i = 0; i < initialSize; ++i) {
                    submitCreateTask(true);
                }
            } else if (!asyncInit) {
                // init connections
                while (poolingCount < initialSize) {
                    try {
                        PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
                        DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
                        connections[poolingCount++] = holder;
                    } catch (SQLException ex) {
                        LOG.error("init datasource error, url: " + this.getUrl(), ex);
                        if (initExceptionThrow) {
                            connectError = ex;
                            break;
                        } else {
                            Thread.sleep(3000);
                        }
                    }
                }

                if (poolingCount > 0) {
                    poolingPeak = poolingCount;
                    poolingPeakTime = System.currentTimeMillis();
                }
            }

            createAndLogThread();
            createAndStartCreatorThread();
            createAndStartDestroyThread();

            initedLatch.await();
            init = true;

            initedTime = new Date();
            registerMbean();

            if (connectError != null && poolingCount == 0) {
                throw connectError;
            }

            if (keepAlive) {
                // async fill to minIdle
                if (createScheduler != null) {
                    for (int i = 0; i < minIdle; ++i) {
                        submitCreateTask(true);
                    }
                } else {
                    this.emptySignal();
                }
            }

        } catch (SQLException e) {
            LOG.error("{dataSource-" + this.getID() + "} init error", e);
            throw e;
        } catch (InterruptedException e) {
            throw new SQLException(e.getMessage(), e);
        } catch (RuntimeException e){
            LOG.error("{dataSource-" + this.getID() + "} init error", e);
            throw e;
        } catch (Error e){
            LOG.error("{dataSource-" + this.getID() + "} init error", e);
            throw e;

        } finally {
            inited = true;
            lock.unlock();

            if (init && LOG.isInfoEnabled()) {
                String msg = "{dataSource-" + this.getID();

                if (this.name != null && !this.name.isEmpty()) {
                    msg += ",";
                    msg += this.name;
                }

                msg += "} inited";

                LOG.info(msg);
            }
        }
    }

initFromSPIServiceLoader():SPI方式初始化filter

    private void initFromSPIServiceLoader() {
        if (loadSpifilterSkip) {
            return;
        }

//        if (autoFilters == null) {
            List<Filter> filters = new ArrayList<Filter>();
            // 对ServiceLoader中的service、loader、acc进行赋值,并且调用reload()方法
            ServiceLoader<Filter> autoFilterLoader = ServiceLoader.load(Filter.class);

            // 这里会调用ServiceLoader中的iterator迭代器,执行lookupIterator.next().nextService()创建filter对象
            for (Filter filter : autoFilterLoader) {
                AutoLoad autoLoad = filter.getClass().getAnnotation(AutoLoad.class);
                if (autoLoad != null && autoLoad.value()) {
                    filters.add(filter);
                }
            }
            autoFilters = filters;
//        }

        for (Filter filter : autoFilters) {
            if (LOG.isInfoEnabled()) {
                LOG.info("load filter from spi :" + filter.getClass().getName());
            }
            addFilter(filter);
        }
    }

ServiceLoad的构造器对service、load、acc成员变量进行赋值,并执行reload()方法
reload()方法中对成员变量lookupIterator赋值一个LazyIterator实例,即延迟创建filter对象
在如下代码执行的时候,会进入ServiceLoader的迭代器iterator()方法

for (Filter filter : autoFilterLoader) {
    ...
}

在iterator()方法中会实例化一个Iterator迭代器,并实现了next()方法,next()方法就是刚才的LazyIterator延迟创建对象,并且会执行nextService()

return lookupIterator.next();

在nextService()中就能看到Class.forName()创建对象了,到此为止filter已经被创建出来

        private S nextService() {
           。。。
            try {
                c = Class.forName(cn, false, loader);
            } catch (ClassNotFoundException x) {
                fail(service,
                     "Provider " + cn + " not found");
            }
           。。。
        }

ServiceLoader有一个成员变量 PREFIX,就是在hasNextService()的时候进行配置类的动态加载

public final class ServiceLoader<S>
    implements Iterable<S>
{

    private static final String PREFIX = "META-INF/services/";
。。。
}
// 存放所有连接的连接池
connections = new DruidConnectionHolder[maxActive];
// 待清理连接的连接池
evictConnections = new DruidConnectionHolder[maxActive];
// 存活连接的连接池
keepAliveConnections = new DruidConnectionHolder[maxActive];

这三个数组存放的是DruidConnectionholer对象,并且对Connection进行在一次的封装,封住为PhysicalConnectionInfo类
初始化连接池的时候可分为同步和异步
异步是通过ScheduledExecutorService调度线程池去分配线程进行异步创建
createPhysicalConnection()创建物理connection,并封装成PhysicalConnectionInfo对象
在实际开发中会遇到密码加密问题,在配置文件中配置的password为密文,那么项目写一个继承DruidPasswordCallback的类,在getPasswordCallback()的时候会回调自己实现的setProperties方法,并在里面可以自定义password

 public PhysicalConnectionInfo createPhysicalConnection() throws SQLException {
        String url = this.getUrl();
        Properties connectProperties = getConnectProperties();

        String user;
        if (getUserCallback() != null) {
            user = getUserCallback().getName();
        } else {
            user = getUsername();
        }

        String password = getPassword();
        // 密码回调,配置文件中配置密文pwd,继承DruidPasswordCallback,在setPropertes中进行解密并赋值pwd
        PasswordCallback passwordCallback = getPasswordCallback();

        if (passwordCallback != null) {
            // 项目中继承DruidPasswordCallback会回调自己实现的password加/解密方式
            if (passwordCallback instanceof DruidPasswordCallback) {
                DruidPasswordCallback druidPasswordCallback = (DruidPasswordCallback) passwordCallback;

                druidPasswordCallback.setUrl(url);
                druidPasswordCallback.setProperties(connectProperties);
            }

            char[] chars = passwordCallback.getPassword();
            if (chars != null) {
                password = new String(chars);
            }
        }

        Properties physicalConnectProperties = new Properties();
        if (connectProperties != null) {
            physicalConnectProperties.putAll(connectProperties);
        }

        if (user != null && user.length() != 0) {
            physicalConnectProperties.put("user", user);
        }

        if (password != null && password.length() != 0) {
            physicalConnectProperties.put("password", password);
        }

        Connection conn = null;

        long connectStartNanos = System.nanoTime();
        long connectedNanos, initedNanos, validatedNanos;

        // 在获取connection后,会执行show variables,得到<variable_name,variables>,赋值给map
        Map<String, Object> variables = initVariants
                ? new HashMap<String, Object>()
                : null;

        // 在获取connection后,会执行show global variables,得到<variable_name,variables>,赋值给map
        Map<String, Object> globalVariables = initGlobalVariants
                ? new HashMap<String, Object>()
                : null;

        createStartNanosUpdater.set(this, connectStartNanos);
        creatingCountUpdater.incrementAndGet(this);
        try {
            // 调用driver获取1个connection物理连接
            conn = createPhysicalConnection(url, physicalConnectProperties);
            connectedNanos = System.nanoTime();

            if (conn == null) {
                throw new SQLException("connect error, url " + url + ", driverClass " + this.driverClass);
            }

            // 做一些初始化配置,配置autocommit、执行一些initsql等等
            initPhysicalConnection(conn, variables, globalVariables);
            initedNanos = System.nanoTime();

            // 验证connection
            validateConnection(conn);
            validatedNanos = System.nanoTime();

            setFailContinuous(false);
            setCreateError(null);
        } 
。。。

        // 将conneciton和其他参数封装成PhysicalConnectionInfo实例返回
        return new PhysicalConnectionInfo(conn, connectStartNanos, connectedNanos, initedNanos, validatedNanos, variables, globalVariables);
    }
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容