前言
上章我们通过jpa和atomikos实现了分布式事务的处理案例。这节,我们来实现jdbc多数据源+atomikos的方式来实现分布式事务的处理案例。
Atomikos介绍
Atomikos 是一个为Java平台提供增值服务的并且开源类事务管理器。我们通过它来管理事务。springboot本身对其有很好的支持,依赖为spring-boot-starter-jta-atomikos。
创建空项目
添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
添加配置
application.yml:
spring:
datasource:
master:
username: root
password: 123456
url: jdbc:mysql://192.168.145.131:3306/test
driver-class-name: com.mysql.cj.jdbc.Driver
slave:
username: root
password: 123456
url: jdbc:mysql://192.168.145.131:3306/test2
driver-class-name: com.mysql.cj.jdbc.Driver
建库
创建test、test2库
test:
CREATE TABLE `student` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`age` int(11) NOT NULL,
`grade` int(11) NOT NULL,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
test2:
CREATE TABLE `teacher` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`age` int(11) NOT NULL,
`course` int(11) NOT NULL,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
完善
目录结构
根据目录结构,请自行创建package和class。
config/DataSourceConfig
package com.mrcoder.sbjdbcmultidbatomikos.config;
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.mysql.cj.jdbc.MysqlXADataSource;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.jta.JtaTransactionManager;
import javax.sql.DataSource;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;
import java.sql.SQLException;
@Configuration
@EnableTransactionManagement(proxyTargetClass = true)
public class DataSourceConfig {
//事务管理器配置start
@Bean(name = "userTransaction")
public UserTransaction userTransaction() throws Throwable {
UserTransactionImp userTransactionImp = new UserTransactionImp();
userTransactionImp.setTransactionTimeout(10000);
return userTransactionImp;
}
@Bean(name = "atomikosTransactionManager", initMethod = "init", destroyMethod = "close")
public TransactionManager atomikosTransactionManager() throws Throwable {
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(true);
return userTransactionManager;
}
@Bean(name = "transactionManager")
@DependsOn({"userTransaction", "atomikosTransactionManager"})
public PlatformTransactionManager transactionManager() throws Throwable {
UserTransaction userTransaction = userTransaction();
TransactionManager atomikosTransactionManager = atomikosTransactionManager();
JtaTransactionManager jtaTransactionManager = new JtaTransactionManager(userTransaction, atomikosTransactionManager);
jtaTransactionManager.setAllowCustomIsolationLevels(true);
return jtaTransactionManager;
}
//master数据源配置
@Primary
@Bean(name = "masterDataSourceProperties")
@Qualifier("masterDataSourceProperties")
@ConfigurationProperties(prefix = "spring.datasource.master")
public DataSourceProperties masterDataSourceProperties() {
return new DataSourceProperties();
}
@Primary
@Bean(name = "masterDataSource", initMethod = "init", destroyMethod = "close")
@ConfigurationProperties(prefix = "spring.datasource.master")
public DataSource masterDataSource() throws SQLException {
MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
mysqlXaDataSource.setUrl(masterDataSourceProperties().getUrl());
mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
mysqlXaDataSource.setPassword(masterDataSourceProperties().getPassword());
mysqlXaDataSource.setUser(masterDataSourceProperties().getUsername());
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(mysqlXaDataSource);
xaDataSource.setUniqueResourceName("xads1");
xaDataSource.setBorrowConnectionTimeout(60);
xaDataSource.setMaxPoolSize(20);
return xaDataSource;
}
@Bean(name = "masterJdbcTemplate")
public JdbcTemplate masterJdbcTemplate() throws SQLException {
JdbcTemplate jdbcTemplate = new JdbcTemplate(masterDataSource());
return jdbcTemplate;
}
//slave数据源配置
@Bean(name = "slaveDataSourceProperties")
@Qualifier("slaveDataSourceProperties")
@ConfigurationProperties(prefix = "spring.datasource.slave")
public DataSourceProperties slaveDataSourceProperties() {
return new DataSourceProperties();
}
@Bean(name = "slaveDataSource", initMethod = "init", destroyMethod = "close")
@ConfigurationProperties(prefix = "spring.datasource.slave")
public DataSource slaveDataSource() throws SQLException {
MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
mysqlXaDataSource.setUrl(slaveDataSourceProperties().getUrl());
mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
mysqlXaDataSource.setPassword(slaveDataSourceProperties().getPassword());
mysqlXaDataSource.setUser(slaveDataSourceProperties().getUsername());
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(mysqlXaDataSource);
xaDataSource.setUniqueResourceName("xads2");
xaDataSource.setBorrowConnectionTimeout(60);
xaDataSource.setMaxPoolSize(20);
return xaDataSource;
}
@Bean(name = "slaveJdbcTemplate")
public JdbcTemplate slaveJdbcTemplate() throws SQLException {
JdbcTemplate jdbcTemplate = new JdbcTemplate(slaveDataSource());
return jdbcTemplate;
}
}
entity/Student
package com.mrcoder.sbjdbcmultidbatomikos.entity;
import java.io.Serializable;
public class Student implements Serializable {
private int id;
private String name;
private int age;
private int grade;
public Student() {
}
public Student(String name, int age, int grade) {
this.name = name;
this.age = age;
this.grade = grade;
}
@Override
public String toString() {
return "Student{" +
"id=" + id +
", name='" + name + '\'' +
", age=" + age +
", grade=" + grade +
'}';
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public int getGrade() {
return grade;
}
public void setGrade(int grade) {
this.grade = grade;
}
}
entity/Teacher
package com.mrcoder.sbjdbcmultidbatomikos.entity;
import java.io.Serializable;
public class Teacher implements Serializable {
private int id;
private String name;
private int age;
private int course;
public Teacher() {
}
public Teacher(String name, int age, int course) {
this.name = name;
this.age = age;
this.course = course;
}
@Override
public String toString() {
return "Teacher{" +
"id=" + id +
", name='" + name + '\'' +
", age='" + age + '\'' +
", course='" + course + '\'' +
'}';
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public int getCourse() {
return course;
}
public void setCourse(int course) {
this.course = course;
}
}
dao/StuentDao
package com.mrcoder.sbjdbcmultidbatomikos.dao;
import com.mrcoder.sbjdbcmultidbatomikos.entity.Student;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Repository;
import javax.annotation.Resource;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
@Repository
public class StudentDao {
@Resource(name = "masterJdbcTemplate")
private JdbcTemplate masterJdbcTemplate;
@Resource(name = "slaveJdbcTemplate")
private JdbcTemplate slaveJdbcTemplate;
class StudentMapper implements RowMapper<Student> {
@Override
public Student mapRow(ResultSet resultSet, int i) throws SQLException {
Student student = new Student();
student.setAge(resultSet.getInt("age"));
student.setGrade(resultSet.getInt("grade"));
student.setName(resultSet.getString("name"));
return student;
}
}
public int save(Student student) {
String sql = "INSERT INTO `test`.`student` (`age`, `grade`,`name`) VALUES (?, ?, ?)";
int result = masterJdbcTemplate.update(sql, new Object[]{student.getAge(), student.getGrade(), student.getName()});
return result;
}
public List<Student> getList() {
String sql = "select * from student s";
List<Student> list = masterJdbcTemplate.query(sql, new StudentMapper());
return list;
}
}
dao/TeacherDao
package com.mrcoder.sbjdbcmultidbatomikos.dao;
import com.mrcoder.sbjdbcmultidbatomikos.entity.Student;
import com.mrcoder.sbjdbcmultidbatomikos.entity.Teacher;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Repository;
import javax.annotation.Resource;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
@Repository
public class TeacherDao {
@Resource(name = "masterJdbcTemplate")
private JdbcTemplate masterJdbcTemplate;
@Resource(name = "slaveJdbcTemplate")
private JdbcTemplate slaveJdbcTemplate;
class TeacherMapper implements RowMapper<Teacher> {
@Override
public Teacher mapRow(ResultSet resultSet, int i) throws SQLException {
Teacher teacher = new Teacher();
teacher.setAge(resultSet.getInt("age"));
teacher.setCourse(resultSet.getInt("course"));
teacher.setName(resultSet.getString("name"));
return teacher;
}
}
public int save(Teacher teacher) {
String sql = "INSERT INTO `test2`.`teacher` (`age`, `course`,`name`) VALUES (?, ?, ?)";
int result = slaveJdbcTemplate.update(sql, new Object[]{teacher.getAge(), teacher.getCourse(), teacher.getName()});
return result;
}
public List<Teacher> getList() {
String sql = "select * from teacher t ";
List<Teacher> list = slaveJdbcTemplate.query(sql, new TeacherMapper());
return list;
}
}
service/CurdService
package com.mrcoder.sbjdbcmultidbatomikos.service;
import com.mrcoder.sbjdbcmultidbatomikos.dao.StudentDao;
import com.mrcoder.sbjdbcmultidbatomikos.dao.TeacherDao;
import com.mrcoder.sbjdbcmultidbatomikos.entity.Student;
import com.mrcoder.sbjdbcmultidbatomikos.entity.Teacher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
@Service
@Transactional(transactionManager = "transactionManager", propagation = Propagation.REQUIRED, readOnly = false, rollbackFor = {Exception.class})
public class CurdService {
@Autowired
private StudentDao studentDao;
@Autowired
private TeacherDao teacherDao;
public void add(int code) {
Student s1 = new Student();
s1.setAge(1);
s1.setGrade(1);
s1.setName("s1");
studentDao.save(s1);
Teacher t1 = new Teacher();
t1.setAge(1);
t1.setName("t1");
t1.setCourse(1);
teacherDao.save(t1);
int result = 1 / code;
}
}
controller/JdbcAtomikosController
package com.mrcoder.sbjdbcmultidbatomikos.controller;
import com.mrcoder.sbjdbcmultidbatomikos.dao.StudentDao;
import com.mrcoder.sbjdbcmultidbatomikos.dao.TeacherDao;
import com.mrcoder.sbjdbcmultidbatomikos.service.CurdService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class JdbcAtomikosController {
@Autowired
private StudentDao studentDao;
@Autowired
private TeacherDao teacherDao;
@Autowired
private CurdService curdService;
@RequestMapping("/commit")
public void add() {
curdService.add(1);
}
@RequestMapping("/rollback")
public void rollback() {
curdService.add(0);
}
@RequestMapping("/list")
public void list() {
System.out.println(studentDao.getList());
System.out.println(teacherDao.getList());
}
}
运行
http://localhost:8080/commit 会在test库的student和test2库的teacher表中各新增一条记录
http://localhost:8080/rollback 人为的制造1/0的异常,异常触发事务,会发现两张表都不会新增记录。
项目地址
https://github.com/MrCoderStack/SpringBootDemo/tree/master/sb-jdbc-multidb-atomikos
https://gitee.com/MrCoderStack/SpringBootDemo/tree/master/sb-jdbc-multidb-atomikos
注意点
请一定注意,两张表为innodb引擎,若出现分布式事务无法触发,请优先查看表引擎。