通过消息中间件RabbitMQ完成分布式事务

一个有意思的问题:
"如果昨天是明天就好了,那么今天就是周五了......."
那么请问 今天是周几?


最近做的一个微服务项目出了一个问题:
需求:用户下单,我们将其信息存入订单表order和order-item两个表中,同时对商品库中的商品sku表进行减库操作
问题:(其实是粗心,redis配置错误),导致用户下单后,订单库中的两个表都会增加记录,但是商品库中的SKU表却没有进行减库操作
出现这种问题肯定不行,在经过检查后,发现是redis没有配置好host,但是也引出了一致性这个问题,订单服务和商品服务其实都开启了本地事务,但是却没有起到作用,原因在于分布式架构下的项目由多个微服务构成,每个功能模块都被拆分成一个服务,而每个服务开启的本地事务只能保证自己服务内的一致性,在上述问题中,订单服务中没有出错,自然不会回滚,但商品服务却由于redis数据取不到根本没有完成功能,出现了这个问题.那么分布式架构如何解决事务一致性的问题呢?
答案是使用Seata来完成分布式事务,Seata(原名Fescar) 是阿里18年开源的分布式事务的框架。
它由如下三个部分组成:
ansaction Coordinator (TC) : 事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚。
Transaction Manager (TM): 控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议。
Resource Manager (RM): 控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚
流程大概如下:

分布式事务

1.当用户进行下单操作,请求直接访问到订单服务的add方法,我们在该方法上加一个全局事务注解@GlobalTransactional(name = "order_add"),TM向TC申请开启一个全局事务,并生成一个全局唯一的全局事务ID
2.接下来订单服务通过feign远程调用其他服务例如商品服务,将全局事务ID传递下去,获得该ID的服务RM会向TC注册一个事务分支,TC会把各个RM归拢到一个全局事务之中,并且创建一把全局锁
3.各服务正常完成功能,在操作数据库时会记录一个undolog日志文件
4.一旦出现错误,TM会立即向TC发起回滚请求,TC即刻调遣全局事务ID管辖下的所有分支事务完成回滚,数据库根据undolog文件同步通知各RM进行反向更新操作,成功后移除undolog文件,释放全局锁
5.如果正常运行,TM就发起全局提交,TC释放全局锁,异步通知各个RM,移除undolog文件

详细代码实现:
如图

1.先创建一个fescar服务,完成全局事务ID的传递和绑定
定义全局事务管理器扫描对象

package com.changgou.fescar.config;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.fescar.rm.datasource.DataSourceProxy;
import com.alibaba.fescar.spring.annotation.GlobalTransactionScanner;
import com.changgou.fescar.filter.FescarRMRequestFilter;
import com.changgou.fescar.interceptor.FescarRestInterceptor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.web.client.RestTemplate;

import javax.sql.DataSource;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Collection;
import java.util.List;

/**
 *  * 创建数据源
 *  * 定义全局事务管理器扫描对象
 *  * 给所有RestTemplate添加头信息防止微服务之间调用问题
 */
@Configuration
public class FescarAutoConfiguration {

    public static final String FESCAR_XID = "fescarXID";

    /***
     * 创建代理数据库
     * @param environment
     * @return
     */
    @Bean
    public DataSource dataSource(Environment environment){
        DruidDataSource dataSource = new DruidDataSource();
        dataSource.setUrl(environment.getProperty("spring.datasource.url"));
        try {
            dataSource.setDriver(DriverManager.getDriver(environment.getProperty("spring.datasource.url")));
        } catch (SQLException e) {
            throw new RuntimeException("can't recognize dataSource Driver");
        }
        dataSource.setUsername(environment.getProperty("spring.datasource.username"));
        dataSource.setPassword(environment.getProperty("spring.datasource.password"));
        return new DataSourceProxy(dataSource);
    }

    /***
     * 全局事务扫描器
     * 用来解析带有@GlobalTransactional注解的方法,然后采用AOP的机制控制事务
     * @param environment
     * @return
     */
    @Bean
    public GlobalTransactionScanner globalTransactionScanner(Environment environment){
        String applicationName = environment.getProperty("spring.application.name");
        String groupName = environment.getProperty("fescar.group.name");
        if(applicationName == null){
            return new GlobalTransactionScanner(groupName == null ? "my_test_tx_group" : groupName);
        }else{
            return new GlobalTransactionScanner(applicationName, groupName == null ? "my_test_tx_group" : groupName);
        }
    }

    /***
     * 每次微服务和微服务之间相互调用
     * 要想控制全局事务,每次TM都会请求TC生成一个XID,每次执行下一个事务,也就是调用其他微服务的时候都需要将该XID传递过去
     * 所以我们可以每次请求的时候,都获取头中的XID,并将XID传递到下一个微服务
     * @param restTemplates
     * @return
     */
    @ConditionalOnBean({RestTemplate.class})
    @Bean
    public Object addFescarInterceptor(Collection<RestTemplate> restTemplates){
        restTemplates.stream()
                .forEach(restTemplate -> {
                    List<ClientHttpRequestInterceptor> interceptors = restTemplate.getInterceptors();
                    if(interceptors != null){
                        interceptors.add(fescarRestInterceptor());
                    }
                });
        return new Object();
    }

    @Bean
    public FescarRMRequestFilter fescarRMRequestFilter(){
        return new FescarRMRequestFilter();
    }

    @Bean
    public FescarRestInterceptor fescarRestInterceptor(){
        return new FescarRestInterceptor();
    }
}

过滤器:

public class FescarRMRequestFilter extends OncePerRequestFilter {

    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger( FescarRMRequestFilter.class);

    /**
     * 给每次线程请求绑定一个XID
     * @param request
     * @param response
     * @param filterChain
     */
    @Override
    protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
        String currentXID = request.getHeader( FescarAutoConfiguration.FESCAR_XID);
        if(!StringUtils.isEmpty(currentXID)){
            RootContext.bind(currentXID);
            LOGGER.info("当前线程绑定的XID :" + currentXID);
        }
        try{
            filterChain.doFilter(request, response);
        } finally {
            String unbindXID = RootContext.unbind();
            if(unbindXID != null){
                LOGGER.info("当前线程从指定XID中解绑 XID :" + unbindXID);
                if(!currentXID.equals(unbindXID)){
                    LOGGER.info("当前线程的XID发生变更");
                }
            }
            if(currentXID != null){
                LOGGER.info("当前线程的XID发生变更");
            }
        }
    }
}

拦截器:

public class FescarRestInterceptor implements RequestInterceptor, ClientHttpRequestInterceptor {

    @Override
    public void apply(RequestTemplate requestTemplate) {
        String xid = RootContext.getXID();
        if(!StringUtils.isEmpty(xid)){
            requestTemplate.header( FescarAutoConfiguration.FESCAR_XID, xid);
        }
    }

    @Override
    public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException {
        String xid = RootContext.getXID();
        if(!StringUtils.isEmpty(xid)){
            HttpHeaders headers = request.getHeaders();
            headers.put( FescarAutoConfiguration.FESCAR_XID, Collections.singletonList(xid));
        }
        return execution.execute(request, body);
    }
}

2.在对应方法上开启全局事务@GlobalTransactional(name = " ")

脏读和脏写

脏读:在RM提交本地事务之后,TC提交全局事务之前,由于本地事务已提交,其他事务便可以读取到已提交事务修改的数据,但在TC回滚全局事务之后,其他事务读取到的数据又会变为更新前的数据

官方给出的解决方案为:脏读取Select语句用于更新,代理方法使用@ GlobalLock + @ Transactional或@GlobalTransaction

脏写:在TC通知分支事务回滚后,分支事务回滚之前,发现涉及的数据已经被修改,无法和UNDO LOG记录中的旧数据匹配上,则会回滚失败!

官方给出的解决方案为: 脏写您必须使用@globaltransaction注意:如果要查询的业务接口不使用@globaltransactional批注,这意味着该方法不需要分布式事务,则可以在该方法上批注@ globallock + @ Transactional批注方法,然后在查询中添加for update语句。如果您的查询接口在事务链接的外边缘具有@globaltransactional批注,则只需在查询中添加for update语句即可。设计此批注的原因是,在分布式注释可用之前,分布式事务需要查询已提交的数据,而业务则不需要分布式事务。使用GlobalTransactional批注会增加一些不必要的RPC额外开销,例如开始返回xid,提交事务等。

@globaltransactional注解会开启全局事务和本地事务

@ globallock 声明事务仅执行在本地RM中,但是本次事务确保在更新状态下的操作记录不会被其他全局事务操作。即将本地事务的执行纳入seata分布式事务的管理,一起竞争全局锁,保证全局事务在执行的时候,本地业务不可以操作全局事务中的记录。

@ globallock + @ Transactional注解只会开启本地事务和获取全局锁

一阶段本地事务提交前,需要确保先拿到 全局锁 。
拿不到 全局锁 ,不能提交本地事务。
拿 全局锁 的尝试被限制在一定范围内,超出范围将放弃,并回滚本地事务,释放本地锁。

举例:

两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。

tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。

tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待 全局锁 。

通过消息中间件完成分布式事务保障最终一致性

需求:用户下单成功同时增加一定积分

通过消息中间件完成分布式事务

1.订单服务开启全局事务,请求访问订单服务之后下单成功,并且向任务表中添加一个任务数据
2.设置一个定时任务,每隔例如3s就扫描一次任务表,如果其中有任务那么将把该任务发送给MQ
3.用户服务接收消息之后,先判断redis中是否存在该任务,有则返回,避免重复消费消息导致加多次积分
4.开启本地事务,检查在数据库对应的积分日志表中是否有该任务数据,有也返回
5.redis中与DB中都不存在该任务数据时,表明该消息未被消费,将该任务信息task存入redis中
6.修改用户表中的积分信息,然后增加积分日志表中的任务数据
7.删除redis中任务task信息
8.将得到消息中task信息再次返回给订单服务,让其删除任务表中的对应数据,同时为了便于统计等功能将对应数据添加到历史任务表中
9.若是在用户服务修改积分时出现错误,导致事务回滚,由于任务表中数据没有被删除,那么用户服务将继续消费同一消息,直至成功为止,如此虽然没有保证实时一致性,但是保证了最终一致性,
注意:由于redis中数据无法被事务回滚,我们应该给它设置一个适当的短期过期时间,比如30s内

勿忘我!

知识回顾:

事务

数据库事务(简称:事务,Transaction)是指数据库执行过程中的一个逻辑单位,由一个有限的数据库操作序列构成。
事务拥有以下四个特性,习惯上被称为ACID特性:
原子性(Atomicity):事务作为一个整体被执行,包含在其中的对数据库的操作要么全部被执行,要么都不执行。
一致性(Consistency):事务应确保数据库的状态从一个一致状态转变为另一个一致状态。一致状态是指数据库中的数据应满足完整性约束。除此之外,一致性还有另外一层语义,就是事务的中间状态不能被观察到(这层语义也有说应该属于原子性)。
隔离性(Isolation):多个事务并发执行时,一个事务的执行不应影响其他事务的执行,如同只有这一个操作在被数据库所执行一样。
持久性(Durability):已被提交的事务对数据库的修改应该永久保存在数据库中。在事务结束时,此操作将不可逆转。

本地事务
起初,事务仅限于对单一数据库资源的访问控制,架构服务化以后,事务的概念延伸到了服务中。倘若将一个单一的服务操作作为一个事务,那么整个服务操作只能涉及一个单一的数据库资源,这类基于单个服务单一数据库资源访问的事务,被称为本地事务(Local Transaction)。

CAP定理:

分布式系统的最大难点,就是各个节点的状态如何同步。CAP 定理是这方面的基本定理,也是理解分布式系统的起点。
分布式系统有三个指标:
Consistency 一致性
Availability 可用性
Partition tolerance 分区容错性
这三个指标不可能同时做到。这个结论就叫做CAP 定理。而分区容错P无法避免,所以C一致性和A可用性不可同时做到

BASE理论:

BASE 理论是对 CAP 中一致性和可用性权衡的结果,其来源于对大型互联网分布式实践的总结,是基于 CAP 定理逐步演化而来的。其核心思想是:
既然无法做到强一致性(Strong consistency),但每个应用都可以根据自身的业务特点,采用适当的方式来使系统达到最终一致性(Eventual consistency)。
全称:Basically Available(基本可用),Soft state(软状态),和 Eventually consistent(最终一致性)三个短语的缩写
Basically Available(基本可用):
当服务出现故障时,功能会出现一些损失,但还是基本能用
Soft state(软状态):
什么是软状态呢?相对于原子性而言,要求多个节点的数据副本都是实时一致的,这是一种 “硬状态”。
软状态指的是:允许系统中的数据存在中间状态,并认为该状态不影响系统的整体可用性,即允许系统在多个不同节点的数据副本存在数据延时。
Eventually consistent(最终一致性):
不保证各个节点的数据副本是实时一致的(强一致性),会存在延时同步,但保证最终的结果是一致的,这就是最终一致

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351