在分布式系统中,解决数据一致性问题都比较麻烦,虽然已经有很多基于消息中间件回调的方案,今天提供一个比较优秀的方案分布式事务Easta,这只是个入门教程,后续跟进
我的项目组合SpringBoot +MyBatis+Dubbo+Easta+Nacos
SpringBoot +MyBatis+Dubbo这个我就不在过多解释了,网上有很多解释
今天主要说的是Easta+Nacos
Nacos
阿里巴巴出品的一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。可以替代zookeeper,这里采用Nacos主要是Easta的官方提供的建议使用,也是用于Dubbo的注册服务,至于和zookeeper的区别,暂时没有研究到后期更近。
Nacos 安装
官方安装说明很简单,
- 下载最新包并解压
- 运行
cd nacos/bin
sh startup.sh -m standalone
好了这就运行了Nacos服务,通过info提示 你可以打http://127.0.0.1:8848/nacos/index.html能够看到Nacos的服务后台
seata
2019 年 1 月,阿里巴巴中间件团队发起了开源项目 Fescar(Fast & EaSy Commit And Rollback),后来改名叫seata了,和社区一起共建开源分布式事务解决方案。Fescar 的愿景是让分布式事务的使用像本地事务的使用一样,简单和高效,并逐步解决开发者们遇到的分布式事务方面的所有难题。先上架构图
- 事务协调器(TC):维护全局和分支事务的状态,驱动全局提交或回滚。(这个是一个总的事务调度和管理,他主要管理每个分支事务,只要分支事务中有一个事务发生回滚,他就会调动所有的其他分支事务回滚,这个分支事务其实就是 其他微服务)
- Transaction Manager(TM):定义全局事务的范围:开始全局事务,提交或回滚全局事务。(这个其实就是分布式事务的首发事务,他是会注册到TC里面的,也就是这条分布式事务头)
- 资源管理器(RM):管理分支事务的资源,与TC通信以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。(这个其实就是各个微服务上的普遍事务,只不过事务的提交是有RM来控制)
安装
- 先下载安装包,解压后 进入bin 目录 运行
sh seata-server.sh -p 8091 -h 127.0.0.1 -m file
打开服务也就是TC - 安装依赖
<!-- dubbo相关依赖-->
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-dependencies-bom</artifactId>
<version>${dubbo.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-registry-nacos</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>0.6.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.17</version>
</dependency>
- application.properties
server.port=8082
dubbo.application.name=dubbo-registry-zookeeper-provider-sample
## 特别注意dubbo的注册服务为nacos
dubbo.registry.address=nacos://192.168.50.86:8848
dubbo.protocol.name=dubbo
## Random port
dubbo.protocol.port=20881
dubbo.consumer.check=false
dubbo.application.qosEnable=false
spring.datasource.url=jdbc:mysql://rm-m5e15q2904j706p79io.mysql.rds.aliyuncs.com:3306/eosplaygametest
spring.datasource.username=XXXX
spring.datasource.password=XXXXX
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
-
在resource里添加file.conf和registry.conf
image.png
file.conf内容我直接拷贝官网DEMO的配置 ,具体每个配置参数并不太了解,后续跟进
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
#thread factory for netty
thread-factory {
boss-thread-prefix = "NettyBoss"
worker-thread-prefix = "NettyServerNIOWorker"
server-executor-thread-prefix = "NettyServerBizHandler"
share-boss-worker = false
client-selector-thread-prefix = "NettyClientSelector"
client-selector-thread-size = 1
client-worker-thread-prefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
boss-thread-size = 1
#auto default pin or 8
worker-thread-size = 8
}
}
service {
#vgroup->rgroup
vgroup_mapping.my_test_tx_group = "default"
#only support single node
default.grouplist = "127.0.0.1:8091"
#degrade current not support
enableDegrade = false
#disable
disable = false
}
client {
async.commit.buffer.limit = 10000
lock {
retry.internal = 10
retry.times = 30
}
}
## transaction log store
store {
## store mode: file、db
mode = "file"
## file store
file {
dir = "file_store/data"
# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
max-branch-session-size = 16384
# globe session size , if exceeded throws exceptions
max-global-session-size = 512
# file buffer size , if exceeded allocate new buffer
file-write-buffer-cache-size = 16384
# when recover batch read size
session.reload.read_size = 100
}
## database store
db {
driver_class = ""
url = ""
user = ""
password = ""
}
}
registry.conf 也是后续跟进
registry {
# file nacos
type = "file"
nacos {
serverAddr = "localhost"
namespace = "public"
cluster = "default"
}
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul
type = "file"
file {
name = "file.conf"
}
}
以上配置好后需要 在数据源里写一个配置类
/**
* @Author: heshouyou
* @Description seata global configuration
* @Date Created in 2019/1/24 10:28
*/
@Configuration
public class SeataAutoConfig {
/**
* autowired datasource config
*/
@Autowired
private DataSourceProperties dataSourceProperties;
/**
* init durid datasource
*
* @Return: druidDataSource datasource instance
*/
@Bean
@Primary
public DruidDataSource druidDataSource(){
DruidDataSource druidDataSource = new DruidDataSource();
druidDataSource.setUrl(dataSourceProperties.getUrl());
druidDataSource.setUsername(dataSourceProperties.getUsername());
druidDataSource.setPassword(dataSourceProperties.getPassword());
druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName());
druidDataSource.setInitialSize(0);
druidDataSource.setMaxActive(180);
druidDataSource.setMaxWait(60000);
druidDataSource.setMinIdle(0);
druidDataSource.setValidationQuery("Select 1 from DUAL");
druidDataSource.setTestOnBorrow(false);
druidDataSource.setTestOnReturn(false);
druidDataSource.setTestWhileIdle(true);
druidDataSource.setTimeBetweenEvictionRunsMillis(60000);
druidDataSource.setMinEvictableIdleTimeMillis(25200000);
druidDataSource.setRemoveAbandoned(true);
druidDataSource.setRemoveAbandonedTimeout(1800);
druidDataSource.setLogAbandoned(true);
return druidDataSource;
}
/**
* init datasource proxy
* @Param: druidDataSource datasource bean instance
* @Return: DataSourceProxy datasource proxy
*/
@Bean
public DataSourceProxy dataSourceProxy(DruidDataSource druidDataSource){
return new DataSourceProxy(druidDataSource);
}
/**
* init mybatis sqlSessionFactory. 这里主要是定义了 一个代理 类 用来代理mybatis, 应为目前就用到mybatis
* @Param: dataSourceProxy datasource proxy
* @Return: DataSourceProxy datasource proxy
*/
@Bean
public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
factoryBean.setDataSource(dataSourceProxy);
factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver()
.getResources("classpath*:/mapper/*.xml"));
factoryBean.setTransactionFactory(new JdbcTransactionFactory());
return factoryBean.getObject();
}
/**
* init global transaction scanner
*
* @Return: GlobalTransactionScanner
*/
@Bean
public GlobalTransactionScanner globalTransactionScanner(){
return new GlobalTransactionScanner("order-gts-fescar-example", "my_test_tx_group");
}
}
使用时 只需要添加注解@GlobalTransactional(timeoutMills = 300000, name = "dubbo-gts-fescar-example")
@Service
@RestController
public class TestController {
final Logger logger = LoggerFactory.getLogger(getClass());
@Reference
UserSerivce user;
@Autowired
user_roleMapper roleMapper;
@GlobalTransactional(timeoutMills = 300000, name = "dubbo-gts-fescar-example")
@RequestMapping("/t")
public void TestData() throws RuntimeException {
String n= user.GetUserName();
logger.info("插入入了"+n);
throw new RuntimeException("ssssss");
}
}
其中@GlobalTransactional 就是 TM ,分布式 事务从TestData()
开始 来看下GetUserName
调用的微服务方法
@Service
public class User implements UserSerivce {
final Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
user_roleMapper roleMapper;
public String GetUserName(){
user_role role= new user_role();
role.setGuid("2222");
role.setIssystem("1");
role.setName("222");
role.setTitle("bbbbb");
int i= roleMapper.insert(role);
return ""+I;
}
}
补充一下
需要在数据库中添加一个表,不然Easta会报错
-- ----------------------------
-- Table structure for undo_log
-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=94 DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of undo_log
-- ----------------------------
SET FOREIGN_KEY_CHECKS=1;
代码运行截图说明
看下结果Info
Branch Rollbacking: 192.168.50.86:8091:2014816472 2014816473 jdbc:mysql://rm-m5e15q2904j706p79io.mysql.rds.aliyuncs.com:3306/eosplaygametest
io.seata.config.FileConfiguration : Could not found property transaction.undo.data.validation, try to use default value instead.
i.s.rm.datasource.undo.UndoLogManager : xid 192.168.50.86:8091:2014816472 branch 2014816473, undo_log deleted with GlobalFinished
io.seata.rm.AbstractRMHandler : Branch Rollbacked result: PhaseTwo_Rollbacked
io.seata.core.rpc.netty.RmRpcClient : RmRpcClient sendResponse xid=192.168.50.86:8091:2014816472,branchId=2014816473,branchStatus=PhaseTwo_Rollbacked,result code =Success,getMsg =null
可以看到 Branch Rollbacking
分支事务成功回滚了
注意一下这个微服务的提供方法 并不需做任何修改 这个方法会自动被(RM管理)
@Service
public class User implements UserSerivce {
final Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
user_roleMapper roleMapper;
public String GetUserName(){
user_role role= new user_role();
role.setGuid("2222");
role.setIssystem("1");
role.setName("222");
role.setTitle("bbbbb");
int i= roleMapper.insert(role);
return ""+i;
}
}
重要的事情说三遍
遵循JAVA 业务至上原则
代码没有侵入性
代码没有侵入性
代码没有侵入性
如果需要的话 补上ZK篇O(∩_∩)O哈哈~ 未完待续