spring-boot+mybatis+druid+atomikos(支持分布式事务)

1.多数据源事务管控

    之前写了一篇基于注解来动态切换数据源的demo,但是那个demo是不支持多数据源的事务的,也就是说在执行多数据源数据改动操作的时候,如果其中某个数据源发生异常,之前操作的数据源的事务已经提交不会回滚,只有发生异常的数据库才会回滚事务,这就导致了事务的不一致性,这里我参考了网上的大量文章,在基于注解动态切换数据源的基础上改进,使用atomikos分布式事务来对多个数据源资源的事务进行管控,从而实现多数据源的事务一致性


2 项目搭建

  • 2.1 pom依赖和yml配置
  • 2.1.1 pom.xml

pom依赖和上篇文章差不多,只是额外添加了atomikos的依赖,以及apache下的commons-lang3的jar包依赖,mysql的版本需要使用6.0.6,版本高了,启动会报错

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <!--<parent>-->
        <!--<groupId>org.springframework.boot</groupId>-->
        <!--<artifactId>spring-boot-starter-parent</artifactId>-->
        <!--<version>2.2.1.RELEASE</version>-->
        <!--<relativePath/> &lt;!&ndash; lookup parent from repository &ndash;&gt;-->
    <!--</parent>-->
    <groupId>com.sccl</groupId>
    <artifactId>data_source_change</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>data_source_change</name>
    <description>Demo project for Spring Boot</description>

    <!--不继承spring-boot-starter-parent,使用依赖管理-->
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.1.6.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <!-- SpringBoot Web容器 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- SpringBoot 拦截器 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
        <!-- SpringBoot集成mybatis框架 -->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>1.3.2</version>
        </dependency>
        <!--可以不使用mybatis-plus中的会话工厂来完成数据源切换了,故而注释此依赖-->
        <!--<dependency>-->
            <!--<groupId>com.baomidou</groupId>-->
            <!--<artifactId>mybatis-plus-boot-starter</artifactId>-->
            <!--<version>3.1.0</version>-->
        <!--</dependency>-->
        <!--阿里数据库连接池 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.10</version>
        </dependency>
        <!-- Mysql驱动包 这里请使用6.0.6版本的mysql,版本高了会报错-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>6.0.6</version>
            <!--<scope>runtime</scope>-->
        </dependency>
        <!--atomikos分布式事务-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jta-atomikos</artifactId>
        </dependency>
        <!--防止@ConfigurationProperties属性注入爆错,引入此依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>
  • 2.1.2 yml文件

yml文件中,主要改动是将type改成xa类型的数据源,这样才能让多个数据源资源被atomoikos管理

server:
  port: 8099
  servlet:
    context-path: /data
spring:
  datasource:
    druid:
      # 注意(名称不支持大写和下划线可用中横线 比如 错误 的命名(slave_**, slaveTwo))
      master: #主库(数据源-1)
        type: com.alibaba.druid.pool.xa.DruidXADataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:3306/chapter05-1?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowMultiQueries=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai
        username: root
        password: 123456
      slave: #从库(数据源-2)
        open: true
        type: com.alibaba.druid.pool.xa.DruidXADataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:3306/chapter05-2?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowMultiQueries=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai
        username: root
        password: 123456
    #jta相关参数配置
  jta:
    log-dir: classpath:tx-logs
    transaction-manager-id: txManager
#mybatis的配置在会话工厂里面配置,在这里配置会报错
#mybatis:
#  type-aliases-package: com.sccl.data_source_change.*.domain #包别名
#  mapper-locations: classpath*:mybatis/**/*.xml #扫描mapper映射文件
  • 2.2 项目代码重构

项目目录:


项目目录
  • 2.2.1 自定义注解和切面

代码基本没改动

DataSource自定义注解

package com.sccl.data_source_change.aspectj.annotation;


import com.sccl.data_source_change.enumConst.DataSourceEnum;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**自定义多数据源切换注解
 * Create by wangbin
 * 2019-11-18-15:25
 */

/**
 * 注解说明:
 * @author wangbin
 * @date 2019/11/18 15:36

源码样例:

 @Target(ElementType.METHOD)
 @Retention(RetentionPolicy.RUNTIME)
 @Documented
 @Inherited
 public @interface MthCache {
 String key();
 }

 @Target 注解

 功能:指明了修饰的这个注解的使用范围,即被描述的注解可以用在哪里。

 ElementType的取值包含以下几种:

 TYPE:类,接口或者枚举
 FIELD:域,包含枚举常量
 METHOD:方法
 PARAMETER:参数
 CONSTRUCTOR:构造方法
 LOCAL_VARIABLE:局部变量
 ANNOTATION_TYPE:注解类型
 PACKAGE:包
 =======================================================================================
 @Retention 注解

 功能:指明修饰的注解的生存周期,即会保留到哪个阶段。

 RetentionPolicy的取值包含以下三种:

 SOURCE:源码级别保留,编译后即丢弃。
 CLASS:编译级别保留,编译后的class文件中存在,在jvm运行时丢弃,这是默认值。
 RUNTIME: 运行级别保留,编译后的class文件中存在,在jvm运行时保留,可以被反射调用。

 ====================================================================================
 @Documented 注解

 功能:指明修饰的注解,可以被例如javadoc此类的工具文档化,只负责标记,没有成员取值。
 ========================================================================================
 @Inherited注解

 功能:允许子类继承父类中的注解。

 注意!:

 @interface意思是声明一个注解,方法名对应参数名,返回值类型对应参数类型。
 */
 @Target(ElementType.METHOD) //此注解使用于方法上
 @Retention(RetentionPolicy.RUNTIME) //此注解的生命周期为:运行时,在编译后的class文件中存在,在jvm运行时保留,可以被反射调用
public @interface DataSource {
    /**
     * 切换数据源值
     */
    DataSourceEnum value() default DataSourceEnum.MASTER;
}

DsAspect数据源动态切换的切面

package com.sccl.data_source_change.aspectj;

import com.sccl.data_source_change.aspectj.annotation.DataSource;
import com.sccl.data_source_change.datasource.DynamicDataSourceContextHolder;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;

/**
 * 多数据源处理切面
 * 事务管理:
 * 事务管理在开启时,需要确定数据源,也就是说数据源切换要在事务开启之前,
 * 我们可以使用Order来配置执行顺序,在AOP实现类上加Order注解,
 * 就可以使数据源切换提前执行,order值越小,执行顺序越靠前。
 * Create by wangbin
 * 2019-11-18-15:55
 */
@Aspect
@Order(1) //order值越小,执行顺序越靠前。<!-- 设置切换数据源的优先级 -->
@Component
public class DsAspect {
    protected Logger logger = LoggerFactory.getLogger(getClass());
    /**
     * 所有添加了DataSource自定义注解的方法都进入切面
     */
    @Pointcut("@annotation(com.sccl.data_source_change.aspectj.annotation.DataSource)")
    public void dsPointCut() {

    }
    // 这里使用@Around,在调用目标方法前,进行aop拦截,通过解析注解上的值来切换数据源。
    // 在调用方法结束后,清除数据源。
    // 也可以使用@Before和@After来编写,原理一样,这里就不多说了。
    @Around("dsPointCut()")
    public Object around(ProceedingJoinPoint point) throws Throwable {
        MethodSignature signature = (MethodSignature) point.getSignature();
        Method method = signature.getMethod();
        if (method.isAnnotationPresent(DataSource.class)) {
            //获取方法上的注解
            DataSource dataSource = method.getAnnotation(DataSource.class);
            if (dataSource != null) {
                //切换数据源
                DynamicDataSourceContextHolder.setDB(dataSource.value().getName());
            }
        }
        try {
            return point.proceed();
        } finally {
            // 销毁数据源 在执行方法之后
            DynamicDataSourceContextHolder.clearDB();
        }
    }
}

  • 2.2.2 数据源枚举

枚举 DataSourceEnum

package com.sccl.data_source_change.enumConst;

/**
 * Create by wangbin
 * 2019-11-19-16:54
 */
public enum DataSourceEnum {
    MASTER("master"),
    SLAVE("slave");
    private String name;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

     DataSourceEnum(String name) {
        this.name = name;
    }
}
  • 2.2.3 动态数据源和动态数据源环境变量

DynamicDataSource动态数据源

package com.sccl.data_source_change.datasource;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

import javax.sql.DataSource;
import java.util.Map;

/** 动态数据源
 * Create by wangbin
 * 2019-11-18-16:06
 */
public class DynamicDataSource extends AbstractRoutingDataSource {

    public DynamicDataSource(DataSource defaultTargetDataSource, Map<Object, Object> targetDataSources){
        super.setDefaultTargetDataSource(defaultTargetDataSource);
        super.setTargetDataSources(targetDataSources);
        super.afterPropertiesSet();
    }

    @Override
    protected Object determineCurrentLookupKey() {
        return DynamicDataSourceContextHolder.getDB();
    }
}

DynamicDataSourceContextHolder动态数据源环境变量控制

package com.sccl.data_source_change.datasource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** 当前线程数据源,负责管理数据源的环境变量
 * Create by wangbin
 * 2019-11-18-16:11
 */
public class DynamicDataSourceContextHolder {
    public static final Logger log = LoggerFactory.getLogger(DynamicDataSourceContextHolder.class);
    /**
     * 使用ThreadLocal维护变量,ThreadLocal为每个使用该变量的线程提供独立的变量副本,
     *  所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。
     */
    private static final ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>();
    /**
     * 设置数据源名
     */
    public static void setDB(String dbType){
        log.info("切换到{}数据源", dbType);
        CONTEXT_HOLDER.set(dbType);
    }
    /**
     * 获取数据源名
     */
    public static String getDB(){
        return CONTEXT_HOLDER.get();
    }
    /**
     * 清理数据源名
     */
    public static void clearDB(){
        CONTEXT_HOLDER.remove();
    }
}

  • 2.2.4 数据源配置,分布式事务管理器,多数据源事务管理器,重写的mybatis会话工厂

数据源配置 DruidMutilConfig(项目代码的主要改动位置)

在数据源配置中,我们需要将master和slave数据库的druid数据库驱动换成xa的,同时要使用动态数据源来创建会话连接,这里和网上很多代码的不同之处在于,网上很多文章都是给每个数据源单独创建一个连接会话,然后进行切换和事务管理,同时还需要分包,一个数据源就要分一个包,这里只使用动态数据源来创建会话连接,切换到哪个数据源的时候就用该数据源来获取连接并管控事务很灵活,

采坑经历:
1.在这个配置中加入事务后动态数据源没法切换,需要重写Transaction,让我们能够动态的根据DatabaseType获取不同的Connection,并且要求不能影响整个事物的特性。
详情参考了:
springboot+mybatis解决多数据源切换事务控制不生效的问题

2.在yml文件中配置的mybatis扫描xml包路径和配置包别名,不起作用,所以在mybatis的会话工厂中配置的这两个属性,这里又有一个坑是mybatis的会话工厂不支持通配符配置包别名,所以参考了网上的文章,写一个类继承mybatis的会话工厂重写了配置包别名的方法,然后使用这个类来配置包别名和xml路径以及数据源和多数据源事务管理器
详情参考了:
typeAliasesPackage支持通配符包路径扫描

package com.sccl.data_source_change.config;

import com.sccl.data_source_change.datasource.DynamicDataSource;
import com.sccl.data_source_change.enumConst.DataSourceEnum;
import com.sccl.data_source_change.utils.PackagesSqlSessionFactoryBean;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.lang.Nullable;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * druid 配置多数据源
 *
 * @author sccl
 */
@Configuration
@EnableTransactionManagement //开启事务
@MapperScan("com.sccl.data_source_change.**.mapper")
public class DruidMutilConfig {


    @Bean(name = "masterDataSource")
    public DataSource masterDataSource(Environment env) {
        String sourceName = "master";
        Properties prop = build(env, "spring.datasource.druid.master.");
        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        //druid的数据库驱动换成xa的
        xaDataSource.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
        xaDataSource.setUniqueResourceName(sourceName);
        xaDataSource.setPoolSize(5);
        xaDataSource.setXaProperties(prop);
        return xaDataSource;
    }

    @Bean(name = "slaveDataSource")
    public DataSource slaveDataSource(Environment env) {
        String sourceName = "slave";
        Properties prop = build(env, "spring.datasource.druid.slave.");
        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        //druid的数据库驱动换成xa的
        xaDataSource.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
        xaDataSource.setUniqueResourceName(sourceName);
        xaDataSource.setPoolSize(5);
        xaDataSource.setXaProperties(prop);
        return xaDataSource;

    }

    private Properties build(Environment env, String prefix) {

        Properties prop = new Properties();
        prop.put("url", env.getProperty(prefix + "url"));
        prop.put("username", env.getProperty(prefix + "username"));
        prop.put("password", env.getProperty(prefix + "password"));
        prop.put("driverClassName", env.getProperty(prefix + "driverClassName", ""));
        //这里只设置了简单的几个属性,如果想做更多的配置可以继续往下添加即可
        return prop;
    }

    /**
     * 动态数据源,在这继续添加 DataSource Bean
     */
    @Bean(name = "dynamicDataSource")
    @Primary
    public DynamicDataSource dataSource(@Qualifier("masterDataSource") DataSource masterDataSource, @Nullable @Qualifier("slaveDataSource") DataSource slaveDataSource) {
        Map<Object, Object> targetDataSources = new HashMap<>();
        targetDataSources.put(DataSourceEnum.MASTER.getName(), masterDataSource);
        if (slaveDataSource != null){
            targetDataSources.put(DataSourceEnum.SLAVE.getName(), slaveDataSource);
        }
        // 还有数据源,在targetDataSources中继续添加
        return new DynamicDataSource(masterDataSource, targetDataSources);
    }

    @Bean(name = "sqlSessionFactory")
    @Primary
    public SqlSessionFactory sqlSessionFactory(@Qualifier("dynamicDataSource") DataSource dataSource)
            throws Exception {
        //参照的别人的代码说需要将会话工厂改成mybatis-plus的sql会话工厂,
        //经测试发现使用mybatis的会话工厂也可以运行,不会报错
//        MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();
        //使用了PackagesSqlSessionFactoryBean继承SqlSessionFactoryBean,重写了配置别名的方法
        PackagesSqlSessionFactoryBean bean = new PackagesSqlSessionFactoryBean();
        bean.setDataSource(dataSource);
        //设置多数据源分布式事务
        bean.setTransactionFactory(new MultiDataSourceTransactionFactory());
        bean.setTypeAliasesPackage("com.sccl.data_source_change.*.domain");//通配符设置包别名
        bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath*:mybatis/**/*.xml"));// 扫描指定目录的xml
        return bean.getObject();
    }

    @Bean(name = "sqlSessionTemplate")
    @Primary
    public SqlSessionTemplate sqlSessionTemplate(
            @Qualifier("sqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}

多数据源事务管理器和相应的工厂(参考别人的代码)
MultiDataSourceTransaction,MultiDataSourceTransactionFactory

MultiDataSourceTransaction

package com.sccl.data_source_change.config;

import com.alibaba.druid.support.logging.Log;
import com.alibaba.druid.support.logging.LogFactory;
import com.sccl.data_source_change.datasource.DynamicDataSource;
import com.sccl.data_source_change.datasource.DynamicDataSourceContextHolder;
import org.apache.ibatis.transaction.Transaction;
import org.springframework.jdbc.CannotGetJdbcConnectionException;
import org.springframework.jdbc.datasource.DataSourceUtils;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static org.apache.commons.lang3.Validate.notNull;

/**
 * <P>多数据源切换,支持事务</P>
 *
 * @author lishuangqi
 * @date 2019/5/16 15:09
 * @since
 */
public class MultiDataSourceTransaction implements Transaction {
    private static final Log LOGGER = LogFactory.getLog(MultiDataSourceTransaction.class);

    private final DataSource dataSource;

    private Connection mainConnection;

    private String mainDatabaseIdentification;

    private ConcurrentMap<String, Connection> otherConnectionMap;


    private boolean isConnectionTransactional;

    private boolean autoCommit;


    public MultiDataSourceTransaction(DataSource dataSource) {
        notNull(dataSource, "No DataSource specified");
        this.dataSource = dataSource;
        otherConnectionMap = new ConcurrentHashMap<>();
        mainDatabaseIdentification= DynamicDataSourceContextHolder.getDB();
    }


    /**
     * {@inheritDoc}
     */
    @Override
    public Connection getConnection() throws SQLException {
        String databaseIdentification = DynamicDataSourceContextHolder.getDB();
        if (databaseIdentification.equals(mainDatabaseIdentification)) {
            if (mainConnection != null) return mainConnection;
            else {
                openMainConnection();
                mainDatabaseIdentification =databaseIdentification;
                return mainConnection;
            }
        } else {
            if (!otherConnectionMap.containsKey(databaseIdentification)) {
                try {
                    Connection conn = dataSource.getConnection();
                    otherConnectionMap.put(databaseIdentification, conn);
                } catch (SQLException ex) {
                    throw new CannotGetJdbcConnectionException("Could not get JDBC Connection", ex);
                }
            }
            return otherConnectionMap.get(databaseIdentification);
        }

    }


    private void openMainConnection() throws SQLException {
        this.mainConnection = DataSourceUtils.getConnection(this.dataSource);
        this.autoCommit = this.mainConnection.getAutoCommit();
        this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.mainConnection, this.dataSource);

        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(
                    "JDBC Connection ["
                            + this.mainConnection
                            + "] will"
                            + (this.isConnectionTransactional ? " " : " not ")
                            + "be managed by Spring");
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void commit() throws SQLException {
        if (this.mainConnection != null && !this.isConnectionTransactional && !this.autoCommit) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Committing JDBC Connection [" + this.mainConnection + "]");
            }
            this.mainConnection.commit();
            for (Connection connection : otherConnectionMap.values()) {
                connection.commit();
            }
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void rollback() throws SQLException {
        if (this.mainConnection != null && !this.isConnectionTransactional && !this.autoCommit) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Rolling back JDBC Connection [" + this.mainConnection + "]");
            }
            this.mainConnection.rollback();
            for (Connection connection : otherConnectionMap.values()) {
                connection.rollback();
            }
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void close() throws SQLException {
        DataSourceUtils.releaseConnection(this.mainConnection, this.dataSource);
        for (Connection connection : otherConnectionMap.values()) {
            DataSourceUtils.releaseConnection(connection, this.dataSource);
        }
    }

    @Override
    public Integer getTimeout() throws SQLException {
        return null;
    }
}

MultiDataSourceTransactionFactory

package com.sccl.data_source_change.config;



import org.apache.ibatis.session.TransactionIsolationLevel;
import org.apache.ibatis.transaction.Transaction;
import org.mybatis.spring.transaction.SpringManagedTransactionFactory;

import javax.sql.DataSource;

/**
 * <P>支持Service内多数据源切换的Factory</P>
 *
 * @author lishuangqi
 * @date 2019/5/16 15:09
 * @since
 */
public class MultiDataSourceTransactionFactory extends SpringManagedTransactionFactory {
    @Override
    public Transaction newTransaction(DataSource dataSource, TransactionIsolationLevel level, boolean autoCommit) {
        return new MultiDataSourceTransaction(dataSource);
    }
}

XATransactionManagerConfig (分布式事务管理器)
上面重写的多数据源事务管理器是为了让我们能根据数据源的不同类型,动态获取数据库连接,而不是从原来的缓存中获取导致数据源没法切换,这里配置的分布式事务管理器是为了让多数据源操作发生异常时,让多数据源的事务进行同步回滚,由于之前配置的数据源都是换成了支持xa协议的,所以多数据源的资源都在atomikos的管控下了,能够进行多数据源的事务回滚

package com.sccl.data_source_change.config;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.jta.JtaTransactionManager;

import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;

/** 分布式事务管理器
 * Create by wangbin
 * 2019-11-21-18:10
 */
@Configuration
public class XATransactionManagerConfig {
    @Bean(name = "userTransaction")
    public UserTransaction userTransaction() throws Throwable {
        UserTransactionImp userTransactionImp = new UserTransactionImp();
        userTransactionImp.setTransactionTimeout(10000);
        return userTransactionImp;
    }

    @Bean(name = "atomikosTransactionManager")
    public TransactionManager atomikosTransactionManager() throws Throwable {
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        userTransactionManager.setForceShutdown(false);
        return userTransactionManager;
    }

    @Bean(name = "transactionManager")
    @DependsOn({ "userTransaction", "atomikosTransactionManager" })
    public PlatformTransactionManager transactionManager() throws Throwable {
        return new JtaTransactionManager(userTransaction(),atomikosTransactionManager());
    }
}

PackagesSqlSessionFactoryBean

这个类继承mybatis的SqlSessionFactoryBean,重写设置包别名的方法,使其支持通配符配置

package com.sccl.data_source_change.utils;

import org.apache.commons.lang3.StringUtils;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.core.type.classreading.CachingMetadataReaderFactory;
import org.springframework.core.type.classreading.MetadataReader;
import org.springframework.core.type.classreading.MetadataReaderFactory;
import org.springframework.util.ClassUtils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/** 配置typeAliasesPackage支持通配符包路径扫描
 * 通过继承重写包路径读取方式来实现支持通配符配置,以前的SqlSessionFactoryBean
 * 不支持通配符设置包别名,所以重写该方法
 * Create by wangbin
 * 2019-11-25-17:18
 */
public class PackagesSqlSessionFactoryBean extends SqlSessionFactoryBean {
    private static final Logger logger = LoggerFactory.getLogger(PackagesSqlSessionFactoryBean.class);

    static final String DEFAULT_RESOURCE_PATTERN = "**/*.class";

    @Override
    public void setTypeAliasesPackage(String typeAliasesPackage) {
        ResourcePatternResolver resolver = (ResourcePatternResolver) new PathMatchingResourcePatternResolver();
        MetadataReaderFactory metadataReaderFactory = new CachingMetadataReaderFactory(resolver);
        typeAliasesPackage = ResourcePatternResolver.CLASSPATH_ALL_URL_PREFIX +
                ClassUtils.convertClassNameToResourcePath(typeAliasesPackage) + "/" + DEFAULT_RESOURCE_PATTERN;

        //将加载多个绝对匹配的所有Resource
        //将首先通过ClassLoader.getResource("META-INF")加载非模式路径部分
        //然后进行遍历模式匹配
        try {
            List<String> result = new ArrayList<String>();
            Resource[] resources =  resolver.getResources(typeAliasesPackage);
            if(resources != null && resources.length > 0){
                MetadataReader metadataReader = null;
                for(Resource resource : resources){
                    if(resource.isReadable()){
                        metadataReader =  metadataReaderFactory.getMetadataReader(resource);
                        try {
                            result.add(Class.forName(metadataReader.getClassMetadata().getClassName()).getPackage().getName());
                        } catch (ClassNotFoundException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
            if(result.size() > 0) {
                super.setTypeAliasesPackage(StringUtils.join(result.toArray(), ","));
            }else{
                logger.warn("参数typeAliasesPackage:"+typeAliasesPackage+",未找到任何包");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

  • 2.2.5 domain,controller,service,mapper,mapper.xml

这些都是常规的层了,只有一些小细节需要注意,这里为了方便直观测试,特意建了master,slave的包分开,其实可以不用分开的,mapper.xml就也分了包


方便直观测试,特意分的包

mapper.xml也分包了

Domain 实体类

Book

import lombok.Data;

/**
 * Create by wangbin
 * 2019-08-07-0:55
 */
@Data
public class Book {
    private Integer id;
    private String name;
    private String author;
}

User

package com.sccl.data_source_change.slave.domain;

import lombok.Data;

/**
 * Create by wangbin
 * 2019-11-21-18:13
 */
@Data
public class User {
    private Integer id;
    private Integer age;
    private String gender;
    private String name;
}

Controller层

直接在controller层测试多库读取和多库写入

package com.sccl.data_source_change.controller;


import com.sccl.data_source_change.master.domain.Book;
import com.sccl.data_source_change.master.service.BookService;
import com.sccl.data_source_change.slave.domain.User;
import com.sccl.data_source_change.slave.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

/**在controller层中注入不同的mapper实例,操作不同的数据源
 * Create by wangbin
 * 2019-08-07-1:26
 */
@RestController
public class BookController {
    @Autowired
    private BookService bookService;
    @Autowired
    private UserService userService;

    @GetMapping("/test1")//测试查询主从库的数据
    public String test1(){
        List<Book> books1 = bookService.getAllBooks();
        List<Book> books2 = bookService.getAllBooks2();
        System.out.println("books1:"+books1);
        System.out.println("books2:"+books2);
        List<User> users1 = userService.getAllUsers();
        System.out.println("user1:"+users1);
        return "OK";
    }
    @Transactional
    @GetMapping("/test2")//测试主从双库写入
    public String test2(){
        Book book = new Book();
        book.setName("罗宾逊");
        book.setAuthor("漂流记");
        int bookNumber = bookService.addBook(book);
        Book book2 = new Book();
        book2.setName("飞驰人生");
        book2.setAuthor("韩寒");
        int bookNumber2 = bookService.addBook2(book2);
        System.out.println("向master数据库添加数据:"+bookNumber);
        System.out.println("向slave数据库添加数据:"+bookNumber2);
        int number = 1/0;//自定义错误,查看事务是否回滚
        return "OK";
    }
    @Transactional
    @GetMapping("/test3")
    public String test3(){
        Book book = new Book();
        book.setName("master");
        book.setAuthor("master");
        int bookNumber = bookService.addBook(book);
        User user = new User();
        user.setAge(18);
        user.setGender("男");
        user.setName("slave");
        int userNumber = userService.addUser(user);
        int number = 1/0;
        return "OK";
    }
}

Service,ServiceImpl 服务层与实现类

BookService


import com.sccl.data_source_change.master.domain.Book;

import java.util.List;

/**
 * Create by wangbin
 * 2019-11-18-17:56
 */
public interface BookService  {
    List<Book> getAllBooks();
    List<Book> getAllBooks2();
    int addBook(Book book);
    int addBook2(Book book);

BookServiceImpl

package com.sccl.data_source_change.master.service;


import com.sccl.data_source_change.aspectj.annotation.DataSource;
import com.sccl.data_source_change.enumConst.DataSourceEnum;
import com.sccl.data_source_change.master.domain.Book;
import com.sccl.data_source_change.master.mapper.BookMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;

/**
 * Create by wangbin
 * 2019-11-18-17:57
 */
@Transactional
@Service
public class BookServiceImpl  implements BookService {
    @Autowired
    private BookMapper bookMapper;
    @DataSource(value = DataSourceEnum.MASTER)
    @Override
    public List<Book> getAllBooks() {
        return bookMapper.getAllBooks();
    }
    @DataSource(value = DataSourceEnum.SLAVE)
    @Override
    public List<Book> getAllBooks2() {
        return bookMapper.getAllBooks();
    }
    @DataSource(value = DataSourceEnum.MASTER)
    @Override
    public int addBook(Book book) {
        return bookMapper.addBook(book);
    }
    @DataSource(value = DataSourceEnum.SLAVE)
    @Override
    public int addBook2(Book book) {
        return bookMapper.addBook(book);
    }
}

UserService

package com.sccl.data_source_change.slave.service;


import com.sccl.data_source_change.slave.domain.User;

import java.util.List;

/**
 * Create by wangbin
 * 2019-11-21-18:18
 */
public interface UserService {
    int addUser(User user);
    List<User> getAllUsers();
}

UserServiceImpl

package com.sccl.data_source_change.slave.service;


import com.sccl.data_source_change.aspectj.annotation.DataSource;
import com.sccl.data_source_change.enumConst.DataSourceEnum;
import com.sccl.data_source_change.slave.domain.User;
import com.sccl.data_source_change.slave.mapper.UserMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;

/**
 * Create by wangbin
 * 2019-11-21-18:19
 */
@Transactional
@Service
public class UserServiceImpl implements UserService {

    @Autowired
    private UserMapper userMapper;
    @DataSource(value = DataSourceEnum.SLAVE)
    @Override
    public int addUser(User user) {
        return userMapper.addUser(user);
    }
    @DataSource(value = DataSourceEnum.SLAVE)
    @Override
    public List<User> getAllUsers() {
        return userMapper.getAllUsers();
    }
}

mapper层

BookMapper

package com.sccl.data_source_change.master.mapper;



import com.sccl.data_source_change.master.domain.Book;

import java.util.List;

/**
 * Create by wangbin
 * 2019-08-07-1:18
 */
public interface BookMapper {
    List<Book> getAllBooks();
    int addBook(Book book);
}

UserMapper

package com.sccl.data_source_change.slave.mapper;


import com.sccl.data_source_change.slave.domain.User;

import java.util.List;

/**
 * Create by wangbin
 * 2019-11-21-18:20
 */
public interface UserMapper {
    int addUser(User user);
    List<User> getAllUsers();
}

mapper.xml文件

BookMapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.sccl.data_source_change.master.mapper.BookMapper">

    <select id="getAllBooks" resultType="Book">
        select * from book
    </select>
    <insert id="addBook" parameterType="Book">
        insert into book (name,author) values (#{name},#{author})
    </insert>
</mapper>

UserMapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.sccl.data_source_change.slave.mapper.UserMapper">

    <select id="getAllUsers" resultType="com.sccl.data_source_change.slave.domain.User">
        select * from user
    </select>
    <insert id="addUser" parameterType="com.sccl.data_source_change.slave.domain.User">
        insert into user (age,gender,name) values (#{age},#{gender},#{name})
    </insert>
</mapper>

3 测试

3.1 测试双库读取

访问 http://localhost:8099/data/test1

BookConreoller测试代码

BookServiceImpl测试代码

UserServiceImpl测试代码

断点调试:
第一次访问切换到master数据源

第一次访问切换到master数据源

第二次访问切换到slave数据源

第二次访问切换到slave数据源

第三次访问切换到slave数据源

第三次访问切换到slave数据源

后台打印结果:

后台打印结果

前端页面显示结果:

前端页面显示结果

后台数据库:

master库,book表数据

master库,book表数据

slave库,book表数据

slave库,book表数据

slave库,user表数据

slave库,user表数据

测试结果:成功查询了master库中的book数据,slave库中的book和user数据

小坑:这里要注意一下,要将@Transactional事务注解方法serviceImpl层,不能放到controller层,不然测试会发现在查询book数据的时候,数据库没有切换,但是查user数据的时候切换了,原因是在查book的时候,查询不同库中的book数据,方法都写在同一个bookService中的,要在同一个service中用不同方法访问不同的数据库需要将事务控制的注解加到serviceImpl层,如果是controller层调用不同service中的方法访问不同数据库,可以直接将事务控制的注解加在controller层

3.2 测试双库写入

访问:http://localhost:8099/data/test2

1.先进行正常测试
BookController测试代码
BookServiceImpl测试代码

后台打印结果:

双库写入数据

后台数据库:

maser库的book中成功添加一条数据

maser库的book中成功添加一条数据

slave库的book中成功添加一条数据

slave库的book中成功添加一条数据

前端页面结果:


前端页面结果
2.进行异常测试,看事务是否同步回滚
放开注释,制造异常,查看事务是否回滚

前端结果:


异常出现,回滚事务

后台打印结果:


发生异常了

后台数据库: 查看数据是否有没有加入进去,事务有没有同步回滚

maste库中数据编号还是9,是之前正常测试加进去的数据,可以看到book数据没有加进去,事务回滚了


image.png

slave库中的数据编号还是5,也是之前正常测试加入的数据,book的数据也没有加进去,事务也回滚了


image.png

测试结果:master,slave库中的数据都没有添加成功,事务都进行了同步回滚

3.进行异常测试,向不同库的不同表添加数据
异常测试,多库事务是否同步回滚
addBook
addUser

前端结果:


异常出现,事务回滚

后台打印结果:


异常出现

后台数据库:

master库中的book数据编号还是9,依旧是第一次正常测试添加的数据,本次添加的数据进行了事务回滚


master库事务回滚

slave库中的user数据只有最早拥有的第一条数据,很显然新的数据也没有加进去,事务回滚了


slave库数据回滚

测试结果:多库操作异常发生,多库事务同步回滚

4.进行正常测试,向不同库的不同表添加数据
正常测试

前端结果:


没发生异常,返回ok

后台打印结果:


没发生异常,数据库操作与切换成功

后台数据库结果:

master库book表中新加了一条数据

master库book表中新加了一条数据

slave库user表中新加了一条数据

slave库user表中新加了一条数据

测试结果:正常测试,向不同库的不同表中添加数据成功

最终结论:采用Atomikos统一管控了多数据源资源操作的事务,通过重写多数据源事务管理器使得在事务管控下能通过注解正常切换数据源,当多数据源操作出现异常时,Atomikos会对管控的多数据源事务进行同步回滚,未发生异常时,数据库正常操作执行

本Demo采用注解形式动态切换数据源,并且可以管控分布式事务,可以不用分包,如果需要添加更多的数据源可以在枚举和yml中简单配置一下即可,比较灵活

本项目参照了很多网上的文章,文章出处在文中已标注

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,826评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,968评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,234评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,562评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,611评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,482评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,271评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,166评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,608评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,814评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,926评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,644评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,249评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,866评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,991评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,063评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,871评论 2 354