Spring Cloud笔记(8)使用Seata管理分布式事务

分布式事务介绍

所谓事务,就是一系列业务操作构成的独立的执行单元。比如用户购买商品下单的行为,需要执行创建订单,扣减商品库存的两个不同的数据库操作,这就是一个事务。事务最重要的特性就是要支持原子性,要么所有操作全部成功,要么全部失败。为什么要这样设计呢?如果一切顺利,当然什么问题都不会有。但天有不测风云,没有谁能保证系统一直不会出错,如果哪一天订单已经创建成功了,但在扣减对应商品库存时突然失败了,那就麻烦大了,订单数据和商品的库存数据可能就对不上了,有可能商品早都没货了,客户还能继续下单购买。

所以,事务在保持数据一致性的方面是非常重要的。在单服务系统中我们一般不需要为事务操心,数据库已经为我们考虑了一切,只要在操作开始前声明了事务,那么调用过程只要发生了错误事务会自动回滚到操作开始时的状态。

但在微服务系统中,不同的业务操作可能被分割到不同的模块,而不同业务模块都会配置一个独立的数据源,甚至订单服务和仓储服务可能都不在同一个数据库中,这样显然就不能只依靠本地数据库事务来解决问题。我们需要一种能够跨网络和应用的分布式事务机制,分布式事务的主要实现思路一般分为两种:

  • 刚性事务:类似于上面说的数据库的本地事务,遵循ACID原则,要求数据的强一致性;代表性的实现就是二阶段提交(XA),通过引入一个事务协调者,所有事务的参与者将操作成败通知协调者,再由协调者根据所有参与者的反馈情报决定各参与者是否要提交操作还是中止操作。XA一度是分布式事务的事实标准,但实际实现上却存在很多问题,比如事务在等待提交过程中处于同步阻塞状态,导致资源长时间占用,整体性能很差。

  • 柔性事务:遵循BASE理论,最终一致性;与刚性事务不同,柔性事务允许一定时间内,不同节点的数据不一致,但要求最终一致。比如刚才我们说到的客户下单的例子,如果扣减库存的时候出错了,我们先不回滚系统,而是将扣减库存的调用请求存起来(比如存放到一个消息队列中),定时去重试。这样虽然订单数据和库存数据在某个时间段内是不一致的,但一旦库存服务恢复了,库存扣减请求就能够正常调用了,这样数据在某个时间点之后就会同步成一致的状态,这就是最终一致性。最终一致性提高了系统的可用性(客户下单可以不受仓储服务故障的影响),但也可能在数据不一致期间发生问题(比如超卖),这个需要考虑业务上是否能够接受。柔性事务的实现主要有重试和补偿两种机制,重试就是刚才描述方式,系统将出错的请求存储下来然后不断进行重试,直到成功;而补偿就是在出错之后,执行类似的一个undo操作,消除已提交的操作对数据的影响,比如扣减库存失败以后就直接把之前创建成功的订单再删除掉,TCC(Try/Confirm/Cancel)型事务就是采用这样的方式。

Seata 的相关概念

由于类似XA的刚性事务实现在复杂的分布式环境中存在大量的问题,所以目前主流的分布式事务实现方案都是走柔性事务路线的。比较流行的解决方案有阿里开源的seata,还有独立开发者发布的LCN框架,但LCN目前的维护更新遇到了困难(独立开发者的悲哀啊)。seata提供了几种不同的事务模式实现,包括:AT、TCC、SAGA 和 XA。在这里我们重点介绍一下AT模式,其它模式的说明请查看 seata的官网。AT模式也是走的补偿路线,但是不需要应用程序去关心调用失败后如何恢复数据,而是由框架本身负责去恢复收当前事务影响的所有数据,这非常类似于我们已经习惯了的本地事务,对开发者的负担也比较小。

seata可分为三个角色:TC,TM和RM:

  • TC - 事务协调者: 维护全局和分支事务的状态,驱动全局事务提交或回滚。业务无关,需要在服务端独立进行部署。

  • TM - 事务管理器:发起全局事务的开始,提交或回滚(发起后由TC协调其它的RM执行相应的操作),该角色会集成到应用程序当中。

  • RM - 资源管理器:全局事务的参与者,管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚,该角色也是集成到应用程序当中的。

官网上用户购买商品的例子

部署TC server

首先在 官网 下载seata server的二进制文件,其实就是一个spring boot的应用程序。在服务器上解压之后,进入到conf文件夹,我们需要重点关注registry.conf这个文件,它的作用是配置seata server(TC)注册到哪个注册中心,目前支持几乎所有主流的注册中心,TC会从配置中心中读取相应的配置,TM和RM实例也可以通过同一个注册中心找到TC部署的位置,从而实现TC的集群化部署。我们还是延用之前就部署好的consul来作为注册中心,只需要将registry.conf中的type修改为“consul”即可,并调整配置文件中consul的部署地址:

# 注册中心配置,用于TC,TM,RM的相互服务发现
registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "consul"
  consul {
    cluster = "seata"
    serverAddr = "192.168.1.220:8500"
  }
}
# 配置中心配置,用于读取TC的相关配置
config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "file"
  file {
    name = "file.conf"
  }
}

file.conf中是TC的一些持久化配置选项,seata支持文件和数据库两种持久化方式,文件模式比数据库更快,但不支持并发操作,如果要部署TC集群就必须要使用数据库进行持久化。这里我们为了简化,就直接使用默认的file模式。file.conf也可以初始化到配置中心,通过配置中心来统一读取,这对于集群部署是有帮助的,初始化的方式可参考 这里 。修改完配置后,直接运行seata-server.sh启动TC server:

$ sh ./bin/seata-server.sh -p 8091 -h 127.0.0.1

启动成功后我们可以在consul的控制台看到TC注册的服务:


TC注册成功

集成TM和RM

seata的AT模式会在服务调用失败后,自动恢复受影响的数据,其原理就是在启动事务之后,会自动分析当前事务中执行的SQL对数据的影响,把受影响的数据直接存储到本地数据库中,如果事务回滚了,就通过存储的数据备份对原始数据进行恢复( AT模式介绍 ),所以我们需要在每个RM所在的业务数据库中初始化seata的undo_log表。我们之前的spring-cloud-demo中也还没有建立相应的业务数据库,为了测试分布式事务,我们需要为order-service和storage-service创建两个业务库,并初始化相关的业务表格,整个初始化脚本如下:


-- 创建order-service数据库
CREATE DATABASE `cloud-demo-order`;

CREATE TABLE IF NOT EXISTS `cloud-demo-order`.`t_order` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `customer_code` varchar(45) DEFAULT NULL COMMENT '客户编码',
  `good_code` varchar(45) DEFAULT NULL COMMENT '产品编码',
  `good_quantity` int(11) DEFAULT NULL COMMENT '购买数量',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- AT模式需要的undo log表
CREATE TABLE IF NOT EXISTS `cloud-demo-order`.`undo_log`
(
    `id`            BIGINT(20)   NOT NULL AUTO_INCREMENT COMMENT 'increment id',
    `branch_id`     BIGINT(20)   NOT NULL COMMENT 'branch transaction id',
    `xid`           VARCHAR(100) NOT NULL COMMENT 'global transaction id',
    `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
    `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
    `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
    `log_created`   DATETIME     NOT NULL COMMENT 'create datetime',
    `log_modified`  DATETIME     NOT NULL COMMENT 'modify datetime',
    PRIMARY KEY (`id`),
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';

-- 创建storage-service数据库
CREATE DATABASE `cloud-demo-storage`;

CREATE TABLE IF NOT EXISTS `cloud-demo-storage`.`t_inventory` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `good_code` varchar(45) DEFAULT NULL COMMENT '产品编码',
  `good_quantity` int(11) DEFAULT NULL COMMENT '库存总量',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- AT模式需要的undo log表
CREATE TABLE IF NOT EXISTS `cloud-demo-storage`.`undo_log`
(
    `id`            BIGINT(20)   NOT NULL AUTO_INCREMENT COMMENT 'increment id',
    `branch_id`     BIGINT(20)   NOT NULL COMMENT 'branch transaction id',
    `xid`           VARCHAR(100) NOT NULL COMMENT 'global transaction id',
    `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
    `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
    `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
    `log_created`   DATETIME     NOT NULL COMMENT 'create datetime',
    `log_modified`  DATETIME     NOT NULL COMMENT 'modify datetime',
    PRIMARY KEY (`id`),
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';

初始化脚本后,在order-serivce模块和storage-service模快引入seata的依赖包和相关配置:

  <!--在spring cloud中自动配置seata-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
            <version>2.2.1.RELEASE</version>
        </dependency>

需要注意的是maven库中还有一个包名称是spring-cloud-alibaba-seata,我看了一下两者的源码是一致的,任意引用哪一个包都可以,不知道为什么会有两个不同名称。在模块的application.yml中加入如下内容(以order-service模块的配置为例):

# Seata 配置项,对应 SeataProperties 类
seata:
  application-id: ${spring.application.name} # Seata 应用编号,默认为 ${spring.application.name}
  tx-service-group: ${spring.application.name}-group # 该应用所属事务组编号,用于寻找TC集群的映射
  # Seata 服务配置项,对应 ServiceProperties 类
  service:
    #事务分组与TC集群(seata)集群的映射关系,order-service-group对应tx-service-group参数,值为一个虚拟的TC集群名称
    #默认值就是default,如果无需分组可以不用设置
    vgroup-mapping:
      order-service-group: default
  # Seata注册中心配置项
  registry:
    type: consul # 注册中心类型,默认为 file
    consul:
      cluster: seata #对应seata集群在consul中注册的服务名
      server-addr: 192.168.1.220:8500

需要注意的是 registry.consul.cluster 中指定的名称需要与TC server的同名配置项中指定的名称一致(默认的值是default),否则会找不到TC server,这个在官方文档和很多教程中都没有说明。 更多配置内容可以查看 配置说明

另外为了方便对数据库进行操作,order-serivce模块和storage-service模快都引入了mybatis-plus框架的相关依赖和配置,这里就不做详细介绍了,具体可以去查看源码。一切准备好以后,将原有的orderService.createNewOrder方法做如下改造:

    @GlobalTransactional
    public Integer createNewOrder(OrderDTO orderDTO) {
        Order newOrder = new Order();
        newOrder.setCustomerCode(orderDTO.getCustomerCode());
        newOrder.setGoodCode(orderDTO.getGoodCode());
        newOrder.setGoodQuantity(orderDTO.getQuantity());
        //向本地数据库插入订单信息
        this.save(newOrder);
        InventoryChangeDTO req = new InventoryChangeDTO();
        req.setGoodCode(orderDTO.getGoodCode());
        req.setQuantity(orderDTO.getQuantity());
        //调用远程仓储服务变更库存
        Integer remainQuantity = storageService.updateInventoryOfGood(req);
        return remainQuantity;
    }

其实除了写入数据库的相关代码,这里最重要的变化就是加入了@GlobalTransactional注解,这个注解标记了当前方法会开启一个全局事务。在本例中order-service模块既是 TM (我的理解是声明@GlobalTransactional的地方就算是一个TM)又是 RM,而storage-service模块则是另一个 RM

通过PostMan请求创建订单的接口地址:http://localhost:9001/api/order/create-order,一切顺利的话就能在数据库中看到新增的订单数据和库存数据。

PostMan请求订单创建接口

从相关的日志中可以看出全局事务的开始和提交:

2020-05-06 15:28:21.311  INFO 11516 --- [nio-9001-exec-7] i.seata.tm.api.DefaultGlobalTransaction  : Begin new global transaction [192.168.1.220:8091:2010342569]
2020-05-06 15:28:21.457  INFO 11516 --- [nio-9001-exec-7] i.seata.tm.api.DefaultGlobalTransaction  : [192.168.1.220:8091:2010342569] commit status: Committed
2020-05-06 15:28:21.457  INFO 11516 --- [nio-9001-exec-7] c.g.d.s.o.controller.Controller          : 剩余数量:-20
2020-05-06 15:28:21.576  INFO 11516 --- [atch_RMROLE_1_4] i.s.core.rpc.netty.RmMessageListener     : onMessage:xid=192.168.1.220:8091:2010342569,branchId=2010342570,branchType=AT,resourceId=jdbc:mysql://192.168.1.212:3306/cloud-demo-order,applicationData=null
2020-05-06 15:28:21.576  INFO 11516 --- [atch_RMROLE_1_4] io.seata.rm.AbstractRMHandler            : Branch committing: 192.168.1.220:8091:2010342569 2010342570 jdbc:mysql://192.168.1.212:3306/cloud-demo-order null
2020-05-06 15:28:21.576  INFO 11516 --- [atch_RMROLE_1_4] io.seata.rm.AbstractRMHandler            : Branch commit result: PhaseTwo_Committed

我们将 storageService.updateInventoryOfGood 方法稍稍修改一下,故意引发一个异常来测试全局事务的回滚:

public Integer changeInventory(InventoryChangeDTO req) {
        Inventory inventory = this.getOne(Wrappers.<Inventory>lambdaQuery().eq(Inventory::getGoodCode, req.getGoodCode()));
        if (inventory == null) {
            inventory = new Inventory();
            inventory.setGoodQuantity(0);
            inventory.setGoodCode(req.getGoodCode());
        }
        inventory.setGoodQuantity(inventory.getGoodQuantity() - req.getQuantity());
        this.saveOrUpdate(inventory);
        //引发异常回滚
        Object exceptionCause = null;
        exceptionCause.toString();
        return inventory.getGoodQuantity();
    }

还需要注意一点,测试全局事务回滚时需要将我们直接配置的hystrix fallback关闭,否则应用程序调用远程接口失败后会触发fallback机制,从而让seata认为远程调用是成功的,就不会触发回滚:

//@FeignClient(name = "storage-service", fallback = StorageServiceFallback.class)
//测试全局事务回滚时需要注释掉fallback,否则接口会返回默认的值导致事务无法回滚
@FeignClient(name = "storage-service")
public interface StorageService {

    @PostMapping("/api/storage/change-inventory")
    Integer updateInventoryOfGood(InventoryChangeDTO inventoryChangeDTO);

}

然后我们再次请求创建订单的服务,从日志上可以看出,事务已成功触发回滚操作:

2020-05-06 15:50:57.809  INFO 3804 --- [nio-9001-exec-7] i.seata.tm.api.DefaultGlobalTransaction  : Begin new global transaction [192.168.1.220:8091:2010342574]
2020-05-06 15:50:59.157  INFO 3804 --- [orage-service-1] c.netflix.config.ChainedDynamicProperty  : Flipping property: storage-service.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647
2020-05-06 15:50:59.275  INFO 3804 --- [orage-service-1] c.n.u.concurrent.ShutdownEnabledTimer    : Shutdown hook installed for: NFLoadBalancer-PingTimer-storage-service
2020-05-06 15:50:59.275  INFO 3804 --- [orage-service-1] c.netflix.loadbalancer.BaseLoadBalancer  : Client: storage-service instantiated a LoadBalancer: DynamicServerListLoadBalancer:{NFLoadBalancer:name=storage-service,current list of Servers=[],Load balancer stats=Zone stats: {},Server stats: []}ServerList:null
2020-05-06 15:50:59.290  INFO 3804 --- [orage-service-1] c.n.l.DynamicServerListLoadBalancer      : Using serverListUpdater PollingServerListUpdater
2020-05-06 15:50:59.325  INFO 3804 --- [orage-service-1] c.netflix.config.ChainedDynamicProperty  : Flipping property: storage-service.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647
2020-05-06 15:50:59.329  INFO 3804 --- [orage-service-1] c.n.l.DynamicServerListLoadBalancer      : DynamicServerListLoadBalancer for client storage-service initialized: DynamicServerListLoadBalancer:{NFLoadBalancer:name=storage-service,current list of Servers=[192.168.1.252:9002],Load balancer stats=Zone stats: {unknown=[Zone:unknown;   Instance count:1;   Active connections count: 0;    Circuit breaker tripped count: 0;   Active connections per server: 0.0;]
},Server stats: [[Server:192.168.1.252:9002;    Zone:UNKNOWN;   Total Requests:0;   Successive connection failure:0;    Total blackout seconds:0;   Last connection made:Thu Jan 01 08:00:00 CST 1970;  First connection made: Thu Jan 01 08:00:00 CST 1970;    Active Connections:0;   total failure count in last (1000) msecs:0; average resp time:0.0;  90 percentile resp time:0.0;    95 percentile resp time:0.0;    min resp time:0.0;  max resp time:0.0;  stddev resp time:0.0]
]}ServerList:ConsulServerList{serviceId='storage-service', tag=null}
2020-05-06 15:51:00.119  INFO 3804 --- [nio-9001-exec-7] i.seata.tm.api.DefaultGlobalTransaction  : [192.168.1.220:8091:2010342574] rollback status: Rollbacked

本文的相关代码可以查看这里 spring-cloud-demo

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,039评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,223评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,916评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,009评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,030评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,011评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,934评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,754评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,202评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,433评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,590评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,321评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,917评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,568评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,738评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,583评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,482评论 2 352