今天我们来说一说主从库的读写分离的问题
主从结构
一般情况下主从的结构是由一个Master和多个Slave来组成的, Slave的个数和数据库的业务访问量有密切的关系
MySQL读写分离配置的主要流程
- 在进行数据的更改之前Master会通知存储引擎开启事务,Master会把数据库的操作记录在一个Binary Log中,在写入完成之后,Master通知存储引擎提交事务,这样的一次操作称为一次二进制日志事件;
- Slave开启一个IO线程,把Binary Log中的日志拷贝到自己的Relay log中,如果读取完成之后,Slave的这个线程就会睡眠,等待Master的Binary产生新的数据;
- Slave会把Delay Log中的数据重新执行,使得日志中的数据重现在Slave的数据库中;
一、主从读写分离的配置
- 准备两台服务器并分别安装MySQL软件
- 配置主服务器,编辑主服务器的配置文件
- 在配置文件的mysqld节点下面添加mysql-id和log-bin以及log-bin-index,如下
server-id=1 log-bin=master-bin log-bin-index=master-bin.index
- 配置完成后重启:
service mysqld restart
- 查看Master的状态:
show master status
- 创建用户并赋予从服务器的访问权限:
create user '<user-name>'; grant replication slave on *.* to '<user-name>'@'<host>' identified by '<access-pwd>'; flush privileges;
- 在配置文件的mysqld节点下面添加mysql-id和log-bin以及log-bin-index,如下
- 配置从服务器,编辑配置文件
- 在配置文件的mysqld节点下面添加如下的内容,需要注意的是干掉其中server-id=1,在mysqld中重新添加如下内容:
server-id=2 relay-log=slave-relay-bin relay-log-index=slave-relay-bin.index
- 配置完成后重启:
service mysqld restart
- 把从服务器挂到主服务器之下,连接到从服务器上,执行如下指令
change master to master_host='<master-ip>',master_port=<master-ip>,master_user='<user-name>',master_password='<access-pwd>',master_log_file='<master_status-file-name>',master-log-pos=0;
- 开始监听主服务器:
start slave
- 查看从库的状态:
show slave status \G
- 在配置文件的mysqld节点下面添加如下的内容,需要注意的是干掉其中server-id=1,在mysqld中重新添加如下内容:
- 测试主从:在主库上进行操作,后查看从库的数据变化
二、MySQL读写分离代码编写
-
编写一个读写分离的类
DynamicDataSource
,这个类继承自AbstractRoutingDataSource
,实现其中的方法,如:import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource; public class DynamicDataSource extends AbstractRoutingDataSource { @Override protected Object determineCurrentLookupKey() { return DynamicDataSourceHolder.getDataSourceType(); } }
-
编写一个用来维护连接类型的类
DynamicDataSourceHolder
,为了线程安全可以使用ThreadLocal
来存放两个类型:Master
和Slave
:import org.springframework.util.Assert; public class DynamicDataSourceHolder { public static final String DATASOURCE_TYPE_MASTER = "master"; public static final String DATASOURCE_TYPE_SLAVE = "slave"; private static ThreadLocal<String> typeHolder = new ThreadLocal<>(); public static String getDataSourceType() { String type = typeHolder.get(); if (type == null) { type = DATASOURCE_TYPE_MASTER; } return type; } public static void setDataSourceType(String type) { Assert.notNull(type, "type not allowed to be null"); typeHolder.set(type); } public static void clearDataSourceType() { typeHolder.remove(); } }
-
编写一个用来根据请求自动返回数据库连接类型的拦截器的类
DynamicDataSourceInteceptor
,该类实现MyBatis的拦截器接口:org.apache.ibatis.plugin.Interceptor
:import java.util.Locale; import java.util.Properties; import org.apache.ibatis.executor.Executor; import org.apache.ibatis.executor.keygen.SelectKeyGenerator; import org.apache.ibatis.mapping.BoundSql; import org.apache.ibatis.mapping.MappedStatement; import org.apache.ibatis.mapping.SqlCommandType; import org.apache.ibatis.plugin.Interceptor; import org.apache.ibatis.plugin.Intercepts; import org.apache.ibatis.plugin.Invocation; import org.apache.ibatis.plugin.Plugin; import org.apache.ibatis.plugin.Signature; import org.apache.ibatis.session.ResultHandler; import org.apache.ibatis.session.RowBounds; import org.springframework.transaction.support.TransactionSynchronizationManager; @Intercepts({ @Signature(type=Executor.class, method="update", args={MappedStatement.class, Object.class}), @Signature(type=Executor.class, method="query", args={MappedStatement.class, RowBounds.class, ResultHandler.class, Object.class}) }) public class DynamicDataSourceInteceptor implements Interceptor { @Override public Object intercept(Invocation invocation) throws Throwable { boolean txActive = TransactionSynchronizationManager.isActualTransactionActive(); String type = DynamicDataSourceHolder.DATASOURCE_TYPE_MASTER; if (txActive == false) { Object[] args = invocation.getArgs(); MappedStatement statement = (MappedStatement) args[0]; if (statement.getSqlCommandType().equals(SqlCommandType.SELECT)) { if (statement.getId().contains(SelectKeyGenerator.SELECT_KEY_SUFFIX)) { type = DynamicDataSourceHolder.DATASOURCE_TYPE_MASTER; } else { BoundSql boundSql = statement.getSqlSource().getBoundSql(args[1]); String sql = boundSql.getSql().toLowerCase(Locale.CHINA).replaceAll("[\\n\\r\\t]", " "); if (sql.matches(".*insert\\u0020.*|.*update\\u0020.*|.*delete\\u0020.*")) { type = DynamicDataSourceHolder.DATASOURCE_TYPE_MASTER; } else { type = DynamicDataSourceHolder.DATASOURCE_TYPE_SLAVE; } } } } DynamicDataSourceHolder.setDataSourceType(type); return invocation.proceed(); } @Override public Object plugin(Object target) { if (target instanceof Executor) { return Plugin.wrap(target, this); } return target; } @Override public void setProperties(Properties properties) { } }
-
在Mybatis配置中配置拦截器
<plugins> <plugin interceptor="org.sherekr.dfo.core.dao.datasource.DynamicDataSourceInteceptor" /> </plugins>
-
Spring配置文件中配置两个数据源,一个用来执行DML的数据源,一个用来执行DQL的数据源,同时配置动态数据源和懒连接数据源代理
<bean id="abstractDataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource" abstract="true" destroy-method="close"> <property name="driverClass" value="${jdbc.driver}" /> <property name="user" value="${jdbc.username}" /> <property name="password" value="${jdbc.password}" /> <property name="maxPoolSize" value="30" /> <property name="minPoolSize" value="10" /> <property name="autoCommitOnClose" value="false" /> <property name="checkoutTimeout" value="10000" /> <property name="acquireRetryAttempts" value="2" /> </bean> <bean id="masterDataSource" parent="abstractDataSource"> <property name="jdbcUrl" value="${jdbc.master.url}" /> </bean> <bean id="slaveDataSource" parent="abstractDataSource"> <property name="jdbcUrl" value="${jdbc.slave.url}" /> </bean> <bean id="dynamicDataSource" class="org.sherekr.dfo.core.dao.datasource.DynamicDataSource"> <property name="targetDataSources"> <map> <entry key="master" value-ref="masterDataSource" /> <entry key="slave" value-ref="slaveDataSource" /> </map> </property> </bean> <bean id="dataSource" class="org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy"> <property name="targetDataSource" ref="dynamicDataSource" /> </bean>
这个代码已经是几年前写的代码了,运行时可能会遇到一些问题,大家就见招拆招吧,如果遇到问题欢迎吐我