分布式事务-概述

分布式事务常见方案

1. 两阶段提交方案/XA

XA规范 X/Open组织提出的分布式事务的规范

image.png

同一个事务上下文中需要协调多种资源(即数据库以及消息主题或队列时)才有必要使用X/Open XA接口

1. XA规范 定义了全局事务管理器(Transaction Manager)和局部资源管理器(Resource Manager)之间的接口。

2. XA接口是双向的系统接口,在事务管理器(Transaction Manager)以及一个或多个资源管理器(Resource Manager)之间形成通信桥梁。

3. XA引入的事务管理器充当上文所述全局事务中的“协调者”角色。事务管理器控制着全局事务,管理事务生命周期,并协调资源。

4. 资源管理器负责控制和管理实际资源(如数据库或JMS队列)。目前,Oracle、Informix、DB2、Sybase和PostgreSQL等各主流数据库都提供了对XA的支持。

atomikos的实现

package com.zwz.atomikos.service;


import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.jdbc.AtomikosDataSourceBean;

import javax.transaction.*;
import java.sql.*;
import java.util.Properties;

/**
 * @description: x https://blog.csdn.net/qq_38322527/article/details/102580683
 * 在使用了事务管理器之后,我们通过atomikos提供的UserTransaction接口的实现类com.atomikos.icatch.jta.UserTransactionImp来开启、提交和回滚事务。
 * 而不再是使用java.sql.Connection中的setAutoCommit(false)的方式来开启事务。其他JTA规范中定义的接口,开发人员并不需要直接使用。
 * @date : 2020/1/7 11:52
 * @author: zwz
 */
public class AtomikosExample {
    private static AtomikosDataSourceBean createAtomikosDataSourceBean(String dbName) {
        // 连接池基本属性
        Properties p = new Properties();
        p.setProperty("url", "jdbc:mysql://127.0.0.1:3306/" + dbName);
        p.setProperty("user", "root");
        p.setProperty("password", "root");

        // 使用AtomikosDataSourceBean封装com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
        AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
        //atomikos要求为每个AtomikosDataSourceBean名称,为了方便记忆,这里设置为和dbName相同
        ds.setUniqueResourceName(dbName);
        ds.setXaDataSourceClassName("com.mysql.jdbc.jdbc2.optional.MysqlXADataSource");
        ds.setXaProperties(p);

        return ds;
    }

    public static void main(String[] args) {
        AtomikosDataSourceBean ds1 = createAtomikosDataSourceBean("db_user");
        AtomikosDataSourceBean ds2 = createAtomikosDataSourceBean("db_account");

        Connection conn1 = null;
        Connection conn2 = null;
        PreparedStatement ps1 = null;
        PreparedStatement ps2 = null;

        // TransactionEssentials 实现了JTA/XA 规范中的事务管理器 应该实现的相关接口
        // UserTransactionManager 实现了 TransactionManager
        // TransactionImp 实现了 Transaction

/*        TransactionEssentials:

        1、实现了JTA/XA规范中的事务管理器(Transaction Manager)应该实现的相关接口,如:

        UserTransaction实现是com.atomikos.icatch.jta.UserTransactionImp,用户只需要直接操作这个类

        TransactionManager实现是com.atomikos.icatch.jta.UserTransactionManager

        Transaction实现是com.atomikos.icatch.jta.TransactionImp*/
        UserTransaction userTransaction = new UserTransactionImp();

        try {
            //开启事务
            userTransaction.begin();

            // 执行db1上的sql
            conn1 = ds1.getConnection();
            ps1 = conn1.prepareStatement("INSERT into user(name) VALUES (?)", Statement.RETURN_GENERATED_KEYS);
            ps1.setString(1, "zwz");
            ps1.executeUpdate();

            ResultSet generatedKeys = ps1.getGeneratedKeys();
            int userId = -1;
            //获取自动生成的userId
            while (generatedKeys.next()) {
                userId = generatedKeys.getInt(1);
            }


            // 模拟异常 ,直接进入catch代码块,2个都不会提交
                int i = 1 / 0;

            // 执行db2上的sql
            conn2 = ds2.getConnection();
            ps2 = conn2.prepareStatement("INSERT into account(user_id,money) VALUES (?,?)");
            ps2.setInt(1, userId);
            ps2.setDouble(2, 100);
            ps2.executeUpdate();

            //两阶段提交
            userTransaction.commit();
        } catch (NotSupportedException | SystemException | SQLException | HeuristicMixedException | HeuristicRollbackException | RollbackException e) {
            e.printStackTrace();
            try {
                userTransaction.rollback();
            } catch (SystemException ex) {
                ex.printStackTrace();
            }
        } finally {
            try {
                if (ps1 != null) {
                    ps1.close();
                }
                if (ps2 != null) {
                    ps2.close();
                }
                if (conn1 != null) {
                    conn1.close();
                }
                if (conn2 != null) {
                    conn2.close();
                }
                ds1.close();
                ds2.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }


    }
}

2pc:协调者询问参与者是否准备好了提交,并根据参与者的反馈情况决定向所有参与者发送commit或者rollback指令。

1. 准备阶段-投票阶段:协调者询问所有参与者是否准备好提交,参与者如果已经准备好提交则回复Prepared,否则回复Non-Prepared。

2. 提交阶段-称执行阶段。协调者如果在上一阶段收到所有参与者回复的Prepared,则在此阶段向所有参与者发送commit指令,所有参与者立即执行commit操作;否则协调者向所有参与者发送rollback指令,参与者立即执行rollback操作。

第一阶段

image

事务管理器通知参与该事务的各个资源管理器,通知他们开启事务、执行SQL(暂不提交),并进入prepare状态(该状态下可执行commit / rollback)。

资源管理器接收到消息后开始准备阶段,写好事务日志并执行事务,但不提交,然后将是否就绪的消息返回给事务管理器(此时已经将事务的大部分事情做完,以后的内容耗时极小)。

第二阶段

image

事务管理器在接受各个消息后,开始分析,如果有任意其一失败,则发送回滚命令,否则发送提交命令。

各个资源管理器接收到命令后,执行(耗时很少),并将提交消息返回给事务管理器。

事务管理器接受消息后,事务结束,应用程序继续执行。

JTA Java Transaction API 基于XA架构建模的

1. 在JTA中,事务管理器抽象为javax.transaction.TransactionManager接口,并通过底层事务服务(即Java Transaction Service)实现。

2. 像很多其他的Java规范一样,JTA仅仅定义了接口,具体的实现则是由供应商(如J2EE厂商)负责提供,目前JTA的实现主要有以下几种:

  • J2EE容器所提供的JTA实现(如JBoss)。

  • 独立的JTA实现:如JOTM(Java Open Transaction Manager),Atomikos。这些实现可以应用在那些不使用J2EE应用服务器的环境里用以提供分布事事务保证。

2. TCC 方案

2.1 TCC方案是采用最终一致性的方式实现的服务层柔性分布式事务方案

TCC

1. try阶段:这个阶段说的是对各个服务的资源做检测以及对资源进行锁定或者预留。

2. confirm阶段:这个阶段说的是在各个服务中执行实际的操作。

3. cancel阶段:如果任何一个服务的业务方法执行出错,这里就需要进行补偿,就是执行已经执行成功的业务逻辑的回滚操作。

2.2 TCC方案评估

  • 优点:
    • JTA方案只能解决同一服务的多数据源的分布式事务问题,TCC方案可以解决微服务架构内同一事务多个服务上连接数据库的提交操作。
    • XA协议采用刚性事务方案,性能和吞吐量较低,TCC采用柔性分布式事务方案
  • 缺点:
    • 对业务的侵入性大
    • 需要考虑预留资源
    • 编写大量的try、confirm、cancel方法
    • 考虑方法幂等性问题

2.3 阿里的seata方案

示例 https://github.com/seata/seata-samples/tree/master/springcloud-eureka-feign-mybatis-seata

    @GlobalTransactional(name = "fsp-create-order",rollbackFor = Exception.class)
    public void create(Order order) {
        LOGGER.info("------->交易开始");
        //本地方法
        orderDao.create(order);

        //远程方法 扣减库存
        storageApi.decrease(order.getProductId(),order.getCount());

        //远程方法 扣减账户余额
        LOGGER.info("------->扣减账户开始order中");
        accountApi.decrease(order.getUserId(),order.getMoney());
        LOGGER.info("------->扣减账户结束order中");

        LOGGER.info("------->交易结束");
    }

seata方案的事务提交过程

Seata管理的分布式事务的典型生命周期:

image.png
  • TM 要求TC开始一个新的全局事务。TC生成代表全局事务的XID

  • XID 通过微服务的调用链传播

  • RM 将本地事务注册为XID到TC的相应全局事务的分支

  • TM 要求TC提交或回退相应的XID全局事务

  • TC驱动XID的相应全局事务下的所有分支事务以完成分支提交或回滚

seata相对于其它分布式事务的最大区别是在第一提交阶段就将各个事务进行了commit操作,这样节约了两个阶段持有锁的事件,提高了整体的执行效率。通过xid管理全局事务,如果要全局回滚,通过xid找到对应的回滚日志记录,通过回滚记录生成反向更新sql,进行更新回滚操作

2.4 问题:现有的TCC事务方案的性能瓶颈在哪里?能支撑高并发交易场景吗?如何优化?

  1. byteTcc (纯正的TCC)基于数据库里面创建的一些表,基于表中的数据进行状态的更新

  2. 核心链路中的各个服务都需要跟TC这个角色进行频繁的网络通信。频繁的通络通信其实就会带来性能的开销,本来一次请求不引入分布式事务只需要100ms,此时引入了分布式事务之后可能需要200ms

  3. 网络请求可能还挺耗时的,上报一些分支事务的状态给TC,seata-server,选择基于那种存储来放这些分布式事务日志或者状态的,file/磁盘文件/mysql(数据库开存放对应的一些状态)

  4. 高并发场景下,会不会有问题,seata-server,你也需要支持扩容,也需要部署多台机器,用一个数据库来存放分布式事务的日志和状态的话,假设并发量每秒上万,分库分表,对TC背后的数据库也会有同样的压力。

  5. 这个时候对TC背后的db也得进行分库分表,抗更高的并发压力。

3. 本地消息表 ebay

流程示意图

1. A系统在自己本地一个事务里操作同时,插入一条数据到消息表

2. 接着A系统将这个消息发送到MQ中去

3. B系统接收到消息之后,在一个事务里,先往自己本地消息表里面插一条数据,同时执行其他的业务操作。

如果这个消息已经被处理过了,那么此时这个事务会回滚,这样保证不会重复处理消息

4. B系统执行成功之后,就会更新自己本地消息表的状态以及A系统消息表的状态

5. 如果B系统处理失败了,那么就不会更新消息表状态,那么此时A系统会定时扫描自己的消息表,如果有没处理的消息,会再次发送到MQ中去,让B再次处理。

6. 这个方案保证了最终一致性,哪怕B事务失败了,但是A会不断重发消息,直到B那边成功为止。

问题

  • 最大的问题在于严重依赖数据库的消息表来管理事务

  • 不太适合高并发

4. 可靠消息最终一致性方案

4.1 基于RocketMQ的流程:

流程示意图

1. A系统先发送一个prepared消息到RocketMQ,如果这个prepared消息发送失败就直接取消操作别执行了。

2. 如果这个消息发送成功了,那么接着执行本地事务,如果成功就告诉MQ发送确认消息,如果失败就告诉MQ回滚消息。

3. 如果发送了确认消息,那么此时B系统会接收到确认消息,然后执行本地事务。

4. MQ会自动定时轮询所有preapred消息回调你的接口。

5. 要是系统B的事务失败了咋办?

  • 重试咯。自动不断重试直到成功,如果实在是不行,那么就是针对重要的资金类业务进行回滚。

  • 比如B系统本地回滚后,想办法通知系统A也回滚,或者是发送报警由人工来手工回
    滚和补偿。

    模拟电商流程


    代码示例

  • 生产者

package com.bfxy.rocketmq.transaction;

import com.bfxy.rocketmq.constants.Const;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @description:
 * @date : 2020/3/19 10:06
 * @author: zwz
 */
public class TransactionProducer {

    public static void main(String[] args) throws UnsupportedEncodingException, MQClientException, InterruptedException {
        TransactionMQProducer producer = new TransactionMQProducer("text_tx_producer_group_name");
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 5, 100,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new ThreadFactory() {

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("text_tx_producer_group_name" + "-thread");
                return thread;
            }
        });
        producer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
        producer.setExecutorService(executorService);
        //这两个对象主要做两件事情。就是异步的执行本地事务。第二件事就是做消息回查
        TransactionListener transactionListener = new TransactionListener() {

            //先发送消息再执行本地事务
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                System.err.println("-----执行本地事务-----");
                String callArg = (String) arg;
                System.err.println("callArg:" + callArg);
                System.err.println("msg:" + msg);
                // tx.begin

                // 数据库落库操作

                // tx.commit

                return LocalTransactionState.UNKNOW;  //UNKONOW 消息不可达
            //    return LocalTransactionState.COMMIT_MESSAGE;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                System.err.println("----回调消息检查----msg:" + msg);
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        };

        producer.setTransactionListener(transactionListener);
        producer.start();

        Message message = new Message("test_tx_topic0", "tagA", "key",
                ("hello rocketmq 4 tx!").getBytes(RemotingHelper.DEFAULT_CHARSET));

        //同步发送消息,成功返回后才执行本地事务。消息发送状态和本地事务执行结果用同一个状态 LocalTransactionState
        //endTransaction 中要么发送回滚消息,要么发送确认消息
        producer.sendMessageInTransaction(message, "我是回调的参数");

        //后面可以继续其他的任务操作

        for (int i = 0; i < 100 * 1000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }

}
  • 消费者
package com.bfxy.rocketmq.transaction;

import com.bfxy.rocketmq.constants.Const;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.List;

/**
 * @description:
 * @date : 2020/3/19 10:54
 * @author: zwz
 */
public class TransactionConsumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("text_tx_producer_group_name");
        consumer.setConsumeThreadMin(10);
        consumer.setConsumeThreadMax(20);
        consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("test_tx_topic0", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                MessageExt me = msgs.get(0);
                try {
                    String topic = me.getTopic();
                    String tags = me.getTags();
                    String keys = me.getKeys();
                    String msgBody = new String(me.getBody(), RemotingHelper.DEFAULT_CHARSET);
                    System.err.println("收到消息=> topic: " + topic + ",tags: " + tags + ", keys: " + keys + ",body: " + msgBody);

                    //这里执行业务逻辑:数据库的事务操作等

                    //应该捕获最大异常

                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
    }
}

问题

  • 这个消息是不是本地事务处理失败了,所以没有发确认消息?

  • 那是继续重试还是回滚?

  • 一般来说这里你就可以查下数据库看之前本地事务是否执行,如果回滚了,那么这里也回滚吧。

  • 这个就是避免可能本地事务执行成功了,而确认消息发送失败了。

4.2 基于其他的消息队列

  1. 需要自己实现一个可靠消息服务,接收Producer发送的half message,然后返回响应给Producer。如果Producer没收到响应,则重发。 然后Producer执行本地事务,接着发送commit/rollback给可靠消息服务。

  2. 可靠消息服务启动一个后台线程定时扫描本地数据库表中所有half message,超过一定时间没commit/rollback就回调Producer接口,确认本地事务是否成功,获取commit/rollback

  3. 如果消息被rollback就废弃掉,如果消息被commit就发送这个消息给下游服务,或者是发送给RabbitMQ/Kafka/ActiveMQ,都可以,然后下游服务消费了,必须回调可靠消息服务接口进行ack

  4. 如果一段时间都没收到ack,则重发消息给下游服务

5. 最大努力通知方案

示意图

1. 系统A本地事务执行完之后,发送消息到MQ

2. 这里会有专门消费MQ的最大努力通知服务,这个服务会消费MQ然后写入数据库中记录下来,或者是放入内存队列也行,接着调用系统B的通知

3. 要是系统B执行成功就OK了,要是B执行失败了,那么最大努力通知服务就定时重新调用系统B,反复N此,最后还是不行就放弃。

真实场景

99%的分布式接口调用,不做分布式事务,直接就是监控(发邮件、发短信)、记录日志、事后快速定位、排查和出解决方案、修复数据。

参考文献

  1. http://www.bytesoft.org/
  2. https://github.com/seata/seata
  3. https://blog.csdn.net/xinquanv1/article/details/103148729
  4. https://gitee.com/shishan100/Java-Interview-Advanced
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,076评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,658评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,732评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,493评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,591评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,598评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,601评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,348评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,797评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,114评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,278评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,953评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,585评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,202评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,180评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,139评论 2 352