关于 HighAvailableDataSource
的介绍,我们可以看一下官方文档相关的介绍,官方文档主要介绍如下几个作用:
- 节点路由 - 根据节点名称指定路由,随机路由,粘性随机路由
- 节点配置 - 纯手工配置节点,根据配置文件生成节点,根据ZooKeeper信息生成节点
- 节点健康检查 - 基于ValidConnectionChecker的节点检查机制,检查间隔时间可根据运行情况动态调整。
我们接下来来测试一下这几部分的内容。
路由节点
首先我们来测试路由节点功能,路由节点需要我们配置多个数据源,然后注入到 HighAvailableDataSource
中,并最终生成 Datasource
。我们先来看一下配置类的信息 :
@org.springframework.context.annotation.Configuration
public class Configuration {
@Autowired
ApplicationContext applicationContext;
@Autowired
@Qualifier("dataSource1")
DataSource dataSource1;
@Autowired
@Qualifier("dataSource2")
DataSource dataSource2;
@Bean(initMethod = "init")
public DataSource dataSource1() {
DruidDataSource druidDataSource = new DruidDataSource();
druidDataSource.setUsername("root");
druidDataSource.setPassword("root");
druidDataSource.setUrl("jdbc:mysql://localhost:3306/information_schema?useSSL=false&useUnicode=true&characterEncoding=UTF-8");
return druidDataSource;
}
@Bean(initMethod = "init")
public DataSource dataSource2() {
DruidDataSource druidDataSource = new DruidDataSource();
druidDataSource.setUsername("root");
druidDataSource.setPassword("root");
druidDataSource.setUrl("jdbc:mysql://localhost:3306/information_schema?useSSL=false&useUnicode=true&characterEncoding=UTF-8");
return druidDataSource;
}
@Bean(initMethod = "init", destroyMethod = "destroy")
public DataSource dataSource() {
HighAvailableDataSource druidDataSource = new HighAvailableDataSource();
Map<String, DataSource> dataSourceMap = new HashMap<>();
dataSourceMap.put("dataSources1", dataSource1);
dataSourceMap.put("default", dataSource1);
dataSourceMap.put("dataSources2", dataSource2);
druidDataSource.setDataSourceMap(dataSourceMap);
druidDataSource.setSelector("byName");
return druidDataSource;
}
@Bean
public SqlSessionFactoryBean sqlSessionFactory(DataSource dataSource) throws IOException {
SqlSessionFactoryBean sqlSessionFactory = new SqlSessionFactoryBean();
sqlSessionFactory.setDataSource(dataSource);
sqlSessionFactory.setMapperLocations(applicationContext.getResources("classpath:mapper/*.xml"));
return sqlSessionFactory;
}
}
如图,我们首先要生成了两个 Datasource , 最后注入到 HighAvailableDataSource
,有些同学会注意到,这里不是使用 ObjectProvider
来获取多个数据源数据源,也是经过一位大佬的提醒,我才意识到这个问题,构造注入的方式是没法解决循环依赖的,而这里恰好有循环依赖的问题。
回到正题,我们最后初始化完成后,日志会打印这两个数据源初始化完成。接着我们就可以像使用同个数据源来使用他了。
根据ZooKeeper生成节点集合
在开始Demo前,我们需要先在 zookeeper 注册数据源相关的节点,如下:
ZookeeperNodeRegister register = new ZookeeperNodeRegister();
register.setZkConnectString("127.0.0.1:2181");
register.setPath("/ha-druid-datasources");
register.init();
List<ZookeeperNodeInfo> payload = new ArrayList<ZookeeperNodeInfo>();
ZookeeperNodeInfo node = new ZookeeperNodeInfo();
node.setPrefix("ha");
node.setHost("127.0.0.1");
node.setPort(3306);
node.setDatabase("information_schema");
node.setUsername("root");
node.setPassword("root");
payload.add(node);
register.register("datasource1", payload);
ZookeeperNodeRegister register2 = new ZookeeperNodeRegister();
register2.setZkConnectString("127.0.0.1:2181");
register2.setPath("/ha-druid-datasources");
register2.init();
List<ZookeeperNodeInfo> payload2 = new ArrayList<ZookeeperNodeInfo>();
ZookeeperNodeInfo node2 = new ZookeeperNodeInfo();
node2.setPrefix("ha");
node2.setHost("127.0.0.1");
node2.setPort(3307);
node2.setDatabase("information_schema");
node2.setUsername("root");
node2.setPassword("root");
payload2.add(node2);
register2.register("datasource2", payload2);
Thread.sleep(1000 * 60 * 60);
我们注册了两个数据源信息,分别是本地的 3306 和 3307 端口,然后停止该线程一个小时,我们需要注意的是假如线程挂了,那么注册的临时几点也将消失,所以需要 sleep ,接下来我们到zk 上看一下注册节点的信息如下:
zk 准备好后我们使用对数据源进行配置,如下:
@Bean
public SqlSessionFactoryBean sqlSessionFactory(DataSource dataSource) throws IOException {
SqlSessionFactoryBean sqlSessionFactory = new SqlSessionFactoryBean();
sqlSessionFactory.setDataSource(dataSource);
sqlSessionFactory.setMapperLocations(applicationContext.getResources("classpath:mapper/*.xml"));
return sqlSessionFactory;
}
@Bean(initMethod = "init", destroyMethod = "destroy")
public DataSource dataSource(ZookeeperNodeListener zkNodeListener) {
HighAvailableDataSource druidDataSource = new HighAvailableDataSource();
druidDataSource.setNodeListener(zkNodeListener);
druidDataSource.setSelector("random");
druidDataSource.setPoolPurgeIntervalSeconds(60);
druidDataSource.setAllowEmptyPoolWhenUpdate(false);
return druidDataSource;
}
@Bean
public ZookeeperNodeListener zkNodeListener() {
ZookeeperNodeListener zookeeperNodeListener = new ZookeeperNodeListener();
zookeeperNodeListener.setZkConnectString("localhost:2181");
zookeeperNodeListener.setPath("/ha-druid-datasources");
zookeeperNodeListener.setUrlTemplate("jdbc:mysql://${host}:${port}/${database}?useUnicode=true");
zookeeperNodeListener.setPrefix("ha");
return zookeeperNodeListener;
}
与之前使用两个 datasource
不同,这里是直接注入了一个zookeeperNodeListener
,即不使用默认的随机 selector
。接着我们启动服务,可以看到以下信息,代表我们数据源初始化成功。
我们可以先测试一下当前数据是否可用,然后试着断开刚才
sleep
的线程,可以看到日志打印出如下信息,删除节点成功:还有关闭数据源的日志如下:
我们可以看到这里只删除了
datasource1
, 我们重新进行测试,还是能正常获取到 datasource2
。这里主要是我们设置了如下属性,所以并不会删除所有的节点:
druidDataSource.setAllowEmptyPoolWhenUpdate(false);
还有最后一个部分就是 节点健康检查 ,这部分主要是一些配置相关的,就不进行细讲了,有兴趣的同学可以自己配置试一下。