改造Spring事务实现Spring Cloud分布式事务

网上很多资料或组件都是完全重新实现事务管理,而本文选择改造Spring事务的提交过程,这样在即简化了集成成本,又不会因事务管理代码出现Bug导致系统没有满足事务特性(ACID),即如果在一次微服务的调用过程中所有的Spring的事务都提交或撤销了,整个系统也满足ACID。

说起事务本身就挺麻烦的,在单点应用中事务由数据库来实现,但却是个业务层概念。Spring通过线程存储共享事务对象实现了业务层事务。网上常见的分布式事务协议有2PC、3PC与TCC,其中TCC对业务层侵入性强,对业务设计改动比较大,不符合前置设想。
2PC/3PC协议是资源层协议,比较适用于数据库直接互相沟通事务,我们可以借助于Spring的业务层事务方式实现业务层的分布式事务。3PC第一个阶段要求所有参与者达成一致后再进行事务操作,即在一开始要求所有参与者都已就位,而微服务的链式调用是同步的,参与者是逐步增加的,所以3PC在微服务场景基本不可实现。本文选择实现2PC协议,2PC协议也是异步的,但2PC不要求所有参与者一开始就位,只要在阶段2之前就位即可,同步可以看做是2PC的异步的一种特殊的情况。

一致性与原子性

一致性这个词在各种资料中算是歧义特别大了,原子性反而出奇的统一。

原子性目前有两种意思,一个是多线程中操作是不可分的,另一个是事务中所有的操作要么都成功,要么全都失败。这两种意思语境很少重复,所以提到原子性基本不会有歧义。

一致性有分布式系统的一致性、事务的一致性、一致性Hash等解释,事务的一致性简单来说就是事务前后数据库的完整性约束不能破坏,它是由事务的其他3个特性共同保证的。
分布式系统中的一致性一般指的是CAP理论中的C,它的定义是所有节点的数据是相同的,又有人把这个定义称为一致性状态,而在分布式系统中协商达成一致性状态的协议算法为一致性协议,2PC就是一个一致性协议。

当事务和分布式系统有交集时,一致性这个词就彻底乱套了,甚至和原子性搅在了一起,网上铺天盖地的最终一致性的资料很多其实是事务的最终原子性。

本文中一致性从CAP理论取义为 所有节点的数据是相同的,英文为"all nodes see the same data at the same time"

因为分布式系统天然有事务的隔离性,事务的持久性由数据库保证,因此我们只要在Spring事务基础上实现分布式事务的原子性,就进而实现了分布式事务的一致性。

即,只要在数据库事务的基础上实现分布式事务的原子性,就实现了分布式事务的ACID4个特性。因此在后文中用原子性指代分布式事务的整体4个特性。

因Spring的事务使用导致的事务一致性错误不在本文讨论范围内。

对于2PC协议,一致性状态为所有节点对事务的提交或回滚达成一致,一致性的结果是事务的原子性。因此2PC协议的一致性和分布式事务的一致性可以看做是等价的。

本文实现与2PC协议的差异性

2PC的各个参与者是平等,互相隔离的。但微服务不是,微服务是有调用与被调用关系的,被调用者的异常会天然返回给调用者并自然引起调用者的连锁回滚。
2PC协议要求服务恢复后能回到事务中,但这个能力只有数据库自己有,Spring的事务提交前宕机,数据库会自己回滚。因此本文实现的2PC协议会在任何一个参与者宕机后整体回滚,此外还有在一致提交时,业务宕机也可能导致的原子性失败。
由于无法保证业务宕机恢复后重试事务的上下文与宕机前一致,所以这个问题不能简单的在技术上解决,必须要业务配合。比如订单、仓储业务,当用户支付订单后仓储服务提交事务前宕机,仓储服务恢复后并不确定还有没有货,如果没货,只能在业务中退款。

协调者宕机异常

协调者正常的前提下,参与者宕机,我们可以简单的回滚事务才达成一致性。如果协调者宕机且没有Fail Over,这种情况需要详细分析。
如果协调者在阶段一宕机,由于微服务的同步性,整个调用栈会在某个业务阻塞等待进行阶段一投票,与2PC协议相同。即协调者在阶段一宕机会导致业务阻塞但不会造成不一致。
如果协调者阶段二宕机,此时同步调用已经完成,等待提交的消息。如果等待超时,也会导致参与者回滚。所以本文的实现,在阶段二协调者和参与者任一宕机都可能导致不一致。


协调者

本文采用Redis充当协调者,但Redis主从切换可能导致数据不一致,上一节中讨论阶段一协调者宕机不会造成不一致,我们要在设计时弥补这一可能性。

协调者协议实现

设计的存储结构如下

数据类型 描述
cloud-transaction/事务ID/state Long 事务存在标志,
值为0时表示事务异常,后续的参与者自行回滚
值为1表示阶段1,值为2表示阶段2
cloud-transaction/事务ID/result String 事务的最终结果,是COMMIT还是ROLLBACK
cloud-transaction/事务ID/notice String 协调者向参与者的通知订阅KEY
cloud-transaction/事务ID/vote Hash[参与者,状态] 投票记录,
HKEY为参与者ID,HValue为投票状态
cloud-transaction/事务ID/ack Hash[参与者,状态] 执行记录,
HKEY为参与者ID,HValue为执行状态

正常流程为:

  1. 业务的开始方为watchdog,负责创建事务ID,写入state为1,并在微服务之间传递事务ID
  2. 判断state值是否为1,若是将Spring事务的执行结果写入投票记录,否则自行ROLLBACK
  3. 调用栈返回至watchdog处,此时可以获取到调用栈是否有异常,同时与投票记录互补
  4. watchdog写state为2,同时将结果写入result
  5. watchdog在notice上广播结果,并设置各键值的过期时间
  6. 各参与者收到通知后执行结果,并写入执行记录

为防范1-3步(阶段1)Redis或参与者异常,在步骤4做如下设定:

  1. 调用栈或投票记录只要有一个为ROLLBACK,整体ROLLBACK
  2. state必须为1或键不存在

即如果Redis异常未能恢复或参与者业务异常,调用栈必然异常。反之调用栈无异常,则表明无业务异常并且Redis最终正常(最终一致性的最终),可通过校验state值判断vote值的可信度。当参与者超时后可以从state和result综合判断是否应该提交事务。有的Redis设置Master宕机时,Slave可以读数据,这种设置最终会在各事务超时回滚。

这么多问题这东西还能用么

标准2PC的协调者是没有存储的,宕机再恢复需要从各参与者获取事务数据。本实现用的是Redis,Redis还是挺可靠的,还可以主从顶一顶。如果系统压力没那么大,负载并不高,用起来是没啥问题的。并且本实现还支持事务重用,即当微服务调用兜兜转转又回来的时候,事务是重用的,可以解决隔离性问题。

为什么要造轮子

  • 纸上得来终觉浅
  • 找工作简历投出去就没影了也很苦恼啊

关键代码

在微服务之间共享事务ID

通过RequestInterceptor为请求微服务之间的调用增加一个http头,这样可以方便的传递事务ID。同时如果事务创建时没有这个http头,那么当前业务就处在微服务栈的栈底

@Component
public class CloudTransactionIdFeignInterceptor implements RequestInterceptor {

    private static final String REQUEST_ATTRIBUTE_TRANSACTION_ID = "X-TRANSACTION-ID";
    private static final String REQUEST_HEADER_TRANSACTION_ID = "X-TRANSACTION-ID";

    public static HttpServletRequest getCurrentRequest() {
        ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.currentRequestAttributes();
        return attributes.getRequest();
    }

    @Override
    public void apply(RequestTemplate template) {
        try {
            HttpServletRequest currentRequest = getCurrentRequest();
            Object attribute = currentRequest.getAttribute(REQUEST_ATTRIBUTE_TRANSACTION_ID);
            if (attribute != null) {
                template.header(REQUEST_HEADER_TRANSACTION_ID, attribute.toString());
            } else {
                String header = currentRequest.getHeader(REQUEST_HEADER_TRANSACTION_ID);
                if (StringUtils.hasText(header)) {
                    template.header(REQUEST_HEADER_TRANSACTION_ID, header);
                }
            }
        } catch (Throwable e) {
            //不能影响正常的流程运行
        }
    }
    //获取事务ID
    public static String getCloudTransactionId() {
        HttpServletRequest currentRequest = getCurrentRequest();
        Object attribute = currentRequest.getAttribute(REQUEST_ATTRIBUTE_TRANSACTION_ID);
        if (attribute != null)
            return attribute.toString();

        return currentRequest.getHeader(REQUEST_HEADER_TRANSACTION_ID);
    }
    //广播事务ID
    public static void broadcastCloudTransactionId(String id) {
        getCurrentRequest().setAttribute(REQUEST_ATTRIBUTE_TRANSACTION_ID, id);
    }
}

拦截Spring事务的提交

Spring通过PlatformTransactionManager Bean对象管理事务,我们实现一个Wrapper把Spring的Bean包起来,拦截关键调用。具体代码见类 WrappedDataSourceTransactionManager

对Spring 事务的 ThreadLocal数据的处理

private String doMoveThreadData(String id) {

        TransactionThreadData data = new TransactionThreadData();
        data.resources = new HashMap<>(TransactionSynchronizationManager.getResourceMap());
        data.synchronizations = TransactionSynchronizationManager.getSynchronizations();
        data.currentTransactionName = TransactionSynchronizationManager.getCurrentTransactionName();
        data.currentTransactionReadOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
        data.currentTransactionIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
        data.actualTransactionActive = TransactionSynchronizationManager.isActualTransactionActive();

        for (Object key : data.resources.keySet()) {
            TransactionSynchronizationManager.unbindResource(key);
        }

        TransactionSynchronizationManager.clear();

        dataMap.put(id, data);

        return id;
    }

    public boolean restoreThreadData(String id) {
        TransactionThreadData data = dataMap.get(id);
        dataMap.remove(id);

        if (data == null)
            return false;

        for (Map.Entry<Object, Object> entry : data.resources.entrySet()) {
            TransactionSynchronizationManager.bindResource(entry.getKey(), entry.getValue());
        }

        TransactionSynchronizationManager.initSynchronization();
        for (TransactionSynchronization synchronization : data.synchronizations) {
            TransactionSynchronizationManager.registerSynchronization(synchronization);
        }

        TransactionSynchronizationManager.setCurrentTransactionName(data.currentTransactionName);
        TransactionSynchronizationManager.setCurrentTransactionReadOnly(data.currentTransactionReadOnly);
        TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(data.currentTransactionIsolationLevel);
        TransactionSynchronizationManager.setActualTransactionActive(data.actualTransactionActive);

        return true;
    }

完整代码见 类 TransactionThreadDataContainer

微服务饶了一圈又回来,继续用未提交的事务

Spring事务是基于ThreadLocal的,只要把ThreadLocal搬过来就可以了

    public static class LoadBalancerFeignClientWrapper implements Client {
        private LoadBalancerFeignClient wrapped;
        private WrappedDataSourceTransactionManager transactionManager;

        public LoadBalancerFeignClientWrapper(Client delegate,
                                       CachingSpringLoadBalancerFactory lbClientFactory,
                                       SpringClientFactory clientFactory, PlatformTransactionManager transactionManager) {
            wrapped = new LoadBalancerFeignClient(delegate, lbClientFactory, clientFactory);
            this.transactionManager = (WrappedDataSourceTransactionManager)transactionManager;
        }

        @Override
        public Response execute(Request request, Request.Options options) throws IOException {
            try {
                 //这里是调用其他微服务,调用的时候把TLS剥离出来
                 //调用完毕或异常时再恢复回来
                 //剥离后的数据就可以安放到其他的线程,达到重用事务的目的
                transactionManager.stealTransactionThreadData();
                Response response = wrapped.execute(request, options);
                transactionManager.returnTransactionThreadData();

                return response;
            } catch (Throwable e) {
                transactionManager.returnTransactionThreadData();
                throw e;
            }
        }
    }

超时

超时使用的是reactor-coreMono,详细见Reactor 3 Reference Guide

        //超时设置
        Mono.delay(Duration.ofMillis(maxWaitTime))
                .map(t -> onTransactionTimeout(transactionId))
                .publishOn(Schedulers.parallel())
                .subscribe();

用的不是timeout而是delay,所以onTransactionTimeout一定会执行,检测事务的结果。

阶段2结果通告

用的时Redis的订阅发布功能。

测试

代码见 https://github.com/giafei/cloud-transaction

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,589评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,615评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,933评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,976评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,999评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,775评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,474评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,359评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,854评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,007评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,146评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,826评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,484评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,029评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,153评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,420评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,107评论 2 356

推荐阅读更多精彩内容