为什么要做数据库读写分离
大多数互联网业务,往往读多写少,这时候,数据库的读会首先称为数据库的瓶颈,这时,如果我们希望能够线性的提升数据库的读性能,消除读写锁冲突从而提升数据库的写性能,那么就可以使用“分组架构”(读写分离架构)。
用一句话概括,读写分离是用来解决数据库的读性能瓶颈的。
解决方案
- 代理、工具,如:MySQL Proxy、Mycat
- 项目内编码实现(本文中介绍这种)
步骤
添加依赖支持
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
添加相关配置
spring:
datasource:
master:
pool-name: master
driver-class-name: com.mysql.jdbc.Driver
jdbc-url: jdbc:mysql://127.0.0.1:3306/payment
username: user_RW
password: 123456
maximum-pool-size: 10
minimum-idle: 5
slave:
- pool-name: slave1
driver-class-name: com.mysql.jdbc.Driver
jdbc-url: jdbc:mysql://127.0.0.1:3306/payment
username: user_R
password: 123456
maximum-pool-size: 10
minimum-idle: 5
- pool-name: slave2
driver-class-name: com.mysql.jdbc.Driver
jdbc-url: jdbc:mysql://127.0.0.1:3306/payment
username: user_R
password: 123456
maximum-pool-size: 10
minimum-idle: 5
配置数据源、事务管理
import com.ssic.payment.core.datasource.DSNames;
import com.ssic.payment.core.datasource.DataSourceAspect;
import com.ssic.payment.core.datasource.DynamicDataSource;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableTransactionManagement(order = 100)
@ConditionalOnClass(JdbcOperations.class)
@ConditionalOnProperty(prefix = "spring.datasource", name = "master.jdbc-url")
public class DataSourceConfig {
@Bean
public DataSourceAspect DataSourceAspect(){
return new DataSourceAspect();
}
@Bean(name = "masterDataSource")
@ConfigurationProperties(prefix="spring.datasource.master")
public DataSource masterDataSource() {
return DataSourceBuilder.create().build();
}
@Bean(name = "slave1DataSource")
@ConfigurationProperties(prefix="spring.datasource.slave[0]")
public DataSource slave1DataSource() {
return DataSourceBuilder.create().build();
}
@Bean(name = "slave2DataSource")
@ConfigurationProperties(prefix="spring.datasource.slave[1]")
public DataSource slave2DataSource() {
return DataSourceBuilder.create().build();
}
@Primary
@Bean(name = "dataSource")
@Qualifier(value = "dataSource")
@DependsOn({"masterDataSource", "slave1DataSource", "slave2DataSource"})
public DataSource dynamicDataSource() {
DataSource masterDataSource = masterDataSource();
DataSource slave1DataSource = slave1DataSource();
DataSource slave2DataSource = slave2DataSource();
Map<Object, Object> targetDataSources = new HashMap<>(3);
targetDataSources.put(DSNames.MASTER, masterDataSource);
targetDataSources.put(DSNames.SLAVE1, slave1DataSource);
targetDataSources.put(DSNames.SLAVE2, slave2DataSource);
DynamicDataSource dynamicDataSource = new DynamicDataSource();
dynamicDataSource.setDefaultTargetDataSource(masterDataSource);
dynamicDataSource.setTargetDataSources(targetDataSources);
return dynamicDataSource;
}
@Bean
public PlatformTransactionManager transactionManager() {
return new DataSourceTransactionManager(dynamicDataSource());
}
}
编码实现接口切面拦截
DataSourceAspect.java
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.core.annotation.Order;
@Slf4j
@Aspect
@Order(1)
public class DataSourceAspect {
@Pointcut("@annotation(com.ssic.payment.core.datasource.Slave) && execution(* com.ssic.payment.*.service.impl.*.*(..))")
public void readPointcut() { }
@Pointcut("@annotation(com.ssic.payment.core.datasource.Master) && execution(* com.ssic.payment.*.service.impl.*.*(..))")
public void writePointcut() { }
@Before("readPointcut()")
public void readBefore(JoinPoint joinPoint) {
DSContextHolder.slave();
String className = joinPoint.getTarget().getClass().getName();
String methodName = joinPoint.getSignature().getName();
log.debug("{}.{} USE DATASOURCE SLAVE", className, methodName);
}
// 关闭这个方法,运行开启事务的测试类,会报错,
// 因为测试用例的事务早于了切面执行,导致主从切换错误
// 解决办法:切面退出前复位数据源到master
@After("readPointcut()")
public void readAfter(JoinPoint joinPoint) {
DSContextHolder.master();
String className = joinPoint.getTarget().getClass().getName();
String methodName = joinPoint.getSignature().getName();
log.debug("{}.{} RESET DATASOURCE MASTER", className, methodName);
}
@Before("writePointcut()")
public void writeBefore(JoinPoint joinPoint) {
DSContextHolder.master();
String className = joinPoint.getTarget().getClass().getName();
String methodName = joinPoint.getSignature().getName();
log.debug("{}.{} USE DATASOURCE MASTER", className, methodName);
}
}
Master.jav
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Master {
}
Slave.java
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Slave {
}
DSNames.java
public enum DSNames {
// 这种方法并不好,见后面的改进说明
MASTER, SLAVE1, SLAVE2;
}
DSContextHolder.java
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class DSContextHolder {
private static final ThreadLocal<DSNames> contextHolder = new ThreadLocal<>();
private static final AtomicInteger counter = new AtomicInteger(-1);
public static void set(DSNames dsType) {
contextHolder.set(dsType);
}
public static DSNames get() {
return contextHolder.get();
}
public static void master() {
set(DSNames.MASTER);
}
public static void slave() {
int index = counter.getAndIncrement() % 2;
if (counter.get() > 9999) {
counter.set(-1);
}
if (index == 0) {
set(DSNames.SLAVE1);
}else {
set(DSNames.SLAVE2);
}
}
}
DynamicDataSource.java
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import org.springframework.lang.Nullable;
public class DynamicDataSource extends AbstractRoutingDataSource {
@Nullable
@Override
protected Object determineCurrentLookupKey() {
return DSContextHolder.get();
}
}
单元测试
import com.ssic.payment.accounting.Application;
import com.ssic.payment.accounting.domain.Demo;
import com.ssic.payment.accounting.domain.enums.DemoStatus;
import lombok.extern.slf4j.Slf4j;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
import java.util.List;
@Slf4j
@Transactional //测试完成回滚数据
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class DemoServiceTest {
@Autowired
public DemoService service;
@Test
public void CRUDTest() {
//CREATE
Demo o = new Demo();
o.setTitle("test");
o.setAmount(1);
o.setCreateTime(new Date());
o.setStatus(DemoStatus.UNPAY);
o.setCreateTime(new Date());
service.save(o);
Assert.assertNotNull(o.getId());
//READ
o = service.findById(o.getId());
Assert.assertNotNull(o.getId());
//UPDATE
o.setTitle("CRUDTest1");
service.save(o);
o = service.findById(o.getId());
Assert.assertTrue(o.getTitle().equals("CRUDTest1"));
//DELETE
service.delete(o.getId());
o = service.findById(o.getId());
Assert.assertNull(o);
}
}
注意
- 默认使用master库(为了在测试的时候支持事务)
- aop切面切换数据源后,需要在退出时,将数据源重置到master
- 使用Order指定切面优先事务之前执行
改进
- 从库的个数是硬编码到系统中的,然后通过DSNames去切换,这样并不容易动态配置,可以改造为动态配置的
DataSourceConfig.java
import com.ssic.payment.core.datasource.DSNames;
import com.ssic.payment.core.datasource.DataSourceAspect;
import com.ssic.payment.core.datasource.DynamicDataSource;
import com.zaxxer.hikari.HikariDataSource;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Configuration
@EnableTransactionManagement(order = 100)
@ConditionalOnClass(JdbcOperations.class)
@EnableConfigurationProperties({MutiDataSourceProperties.class})
@ConditionalOnProperty(prefix = "spring.datasource", name = "master.jdbc-url")
public class DataSourceConfig {
@Resource
MutiDataSourceProperties mutiDataSourceProperties;
@Bean
public DataSourceAspect DataSourceAspect(){
List<String> slaves = new ArrayList<>();
for(Map<String, Object> prop : mutiDataSourceProperties.getSlave()){
slaves.add(prop.get("pool-name").toString());
}
return new DataSourceAspect(slaves);
}
@Bean(name = "masterDataSource")
@ConfigurationProperties(prefix="spring.datasource.master")
public DataSource masterDataSource() {
return DataSourceBuilder.create().build();
}
@Primary
@Bean(name = "dataSource")
@Qualifier(value = "dataSource")
@DependsOn({"masterDataSource"})
public DataSource dynamicDataSource() {
HikariDataSource masterDataSource = (HikariDataSource) masterDataSource();
Map<Object, Object> targetDataSources = new HashMap<>(3);
targetDataSources.put(DSNames.MASTER.name(), masterDataSource);
// 添加读库
for(Map<String, Object> prop : mutiDataSourceProperties.getSlave()){
HikariDataSource ds = buildDataSource(prop, true);
targetDataSources.put(ds.getPoolName(), ds);
}
DynamicDataSource dynamicDataSource = new DynamicDataSource();
dynamicDataSource.setDefaultTargetDataSource(masterDataSource);
dynamicDataSource.setTargetDataSources(targetDataSources);
return dynamicDataSource;
}
@Bean
public PlatformTransactionManager transactionManager() {
return new DataSourceTransactionManager(dynamicDataSource());
}
private HikariDataSource buildDataSource(Map<String, Object> map, Boolean readOnly){
String poolNameKey = "pool-name";
String driverClassNameKey = "driver-class-name";
String jdbcUrlKey = "jdbc-url";
String usernameKey = "username";
String passwordKey = "password";
String maximumPoolSizeKey = "maximum-pool-size";
String minimumIdleKey = "minimum-idle";
HikariDataSource dataSource = new HikariDataSource();
if(map.containsKey(poolNameKey)){
dataSource.setPoolName(map.get(poolNameKey).toString());
}
if(map.containsKey(driverClassNameKey)){
dataSource.setDriverClassName(map.get(driverClassNameKey).toString());
}
if(map.containsKey(jdbcUrlKey)){
dataSource.setJdbcUrl(map.get(jdbcUrlKey).toString());
}
if(map.containsKey(usernameKey)){
dataSource.setUsername(map.get(usernameKey).toString());
}
if(map.containsKey(passwordKey)){
dataSource.setPassword(map.get(passwordKey).toString());
}
if(map.containsKey(maximumPoolSizeKey)){
dataSource.setMaximumPoolSize(Integer.parseInt(map.get(maximumPoolSizeKey).toString()));
}
if(map.containsKey(minimumIdleKey)){
dataSource.setMinimumIdle(Integer.parseInt(map.get(minimumIdleKey).toString()));
}
dataSource.setReadOnly(readOnly);
return dataSource;
}
}
@ConfigurationProperties(prefix = "spring.datasource")
class MutiDataSourceProperties {
private Map<String, Object> master;
private List<Map<String, Object>> slave;
public Map<String, Object> getMaster() {
return master;
}
public void setMaster(Map<String, Object> master) {
this.master = master;
}
public List<Map<String, Object>> getSlave() {
return slave;
}
public void setSlave(List<Map<String, Object>> slave) {
this.slave = slave;
}
}
DataSourceAspect.java
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.core.annotation.Order;
import java.util.List;
@Slf4j
@Aspect
@Order(1)
public class DataSourceAspect {
/**
* 切面启动时设置数据源从库列表名称
* @param slaves
*/
public DataSourceAspect(List<String> slaves){
DSContextHolder.setSlaves(slaves);
}
@Pointcut("@annotation(com.ssic.payment.core.datasource.Slave) && execution(* com.ssic.payment.*.service.impl.*.*(..))")
public void readPointcut() { }
@Pointcut("@annotation(com.ssic.payment.core.datasource.Master) && execution(* com.ssic.payment.*.service.impl.*.*(..))")
public void writePointcut() { }
@Before("readPointcut()")
public void readBefore(JoinPoint joinPoint) {
DSContextHolder.slave();
String className = joinPoint.getTarget().getClass().getName();
String methodName = joinPoint.getSignature().getName();
log.debug("{}.{} USE DATASOURCE SLAVE", className, methodName);
}
@After("readPointcut()")
public void readAfter(JoinPoint joinPoint) {
DSContextHolder.master();
String className = joinPoint.getTarget().getClass().getName();
String methodName = joinPoint.getSignature().getName();
log.debug("{}.{} RESET DATASOURCE MASTER", className, methodName);
}
@Before("writePointcut()")
public void writeBefore(JoinPoint joinPoint) {
DSContextHolder.master();
String className = joinPoint.getTarget().getClass().getName();
String methodName = joinPoint.getSignature().getName();
log.debug("{}.{} USE DATASOURCE MASTER", className, methodName);
}
}
DSContextHolder.java
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class DSContextHolder {
private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
private static final AtomicInteger counter = new AtomicInteger(0);
private static List<String> slaves;
public static void setSlaves(List<String> slaves) {
DSContextHolder.slaves = slaves;
}
public static void set(String dsName) {
contextHolder.set(dsName);
}
public static String get() {
return contextHolder.get();
}
public static void master() {
set(DSNames.MASTER.name());
}
public static void slave() {
if(slaves.size()>0){
int index = counter.getAndIncrement() % slaves.size();
if (counter.get() > 9999) {
counter.set(-1);
}
set(slaves.get(index));
} else {
master();
}
}
}
DSNames.java
public enum DSNames {
MASTER, SLAVE;
}