RocketMQ事务消息

概述

事务消息解决的问题是:Provider本地事务 + 消息投递 一起执行。解决应用端 和 MQ端两个独立的应用的操作,在一个事务里面完成
因为传统的模式无法保证这一点,比如MQ宕机,或者网络丢失,而事务消息有一个两阶段确认的这一操作,可以大大降低这种丢失的概率。
但是这个功能和消费者无关,并不能确保该消息能被消费者成功消费。

消费端同样也存在这个分布式的问题: 成功的从MQ中取出消息到本地 + 消费端成功业务上消费这个消息

思考题

RocketMQ有发送同步消息的功能,只有Broker Ack Send_OK状态码时才代表消息发送成功,否则阻塞重试,重试2次还失败就报错。
既然同步消息可以保证消息成功的写入到MQ中,为什么还要有事务消息呢?
事务消息解决的问题是:Provider本地事务 + 消息投递 一起执行。
而同步消息解决的问题是:消息一定投递成功。

比如 工行用户A 向 建行用户B转账1万元。
使用同步消息:

  1. 工行系统发送一个同步消息给MQ,给B增款1万元
  2. MQ ack反馈发送成功了
  3. 工行系统给用户A扣款1万元
    可能的问题,ack Send_OK之后,工行系统抛出异常,没有给用户A扣款,但是消息已经发送出去了,B赠款成功了。

使用事务消息:

  1. 工行系统发一个事务消息给MQ,给B增款1万元
  2. Broker precommit成功,回调excuteCommit,真正执行工行用户A扣款1万元
  3. 扣款成功ACK Commit给MQ
  4. MQ收到Commit ACK时,commit消息,建行系统可以消费这个消息
    如果工行系统扣款异常,则消息虽然prepareCommit在MQ中,但是对建行不可见。另外如果ACK网络丢失或者延时,MQ如果超时未接收到ACK,会发起重试确认到工行。
    最终确保:扣款 + 消息成功投递 在一个事务里面执行

实现原理

  1. 投递消息:Producer向Broker投递一个事务消息,并且带有唯一的key作为参数(幂等性)
  2. Broker预提交消息(在Broker本地做了存储,但是该消息的状态对Consumer不可见)
  3. Broker预提交成功后回调Producer的executeLocalTransaction方法
  4. Producer提交业务(比如记录最终成功投递的日志),并根据业务提交的执行情况,向Broker反馈Commit 或者回滚
  5. Broker最终处理
  • Broker监听到Producer发来的Commit反馈时,会最终提交这个消息到本地,此时该事务消息对Consumer可见,事务消息最终投递成功,事务结束
  • Broker监听到Producer发来的RollBack反馈时,会最终回滚掉本地的预提交的消息,事务消息最终投递失败,事务结束
  • Broker超时未接受到Producer的反馈,会定时重试调用Producer.checkLocalTransaction,Producer会根据自己的执行情况Ack给Broker

Ack消息的3种状态

Broker是根据Producer发送过来的状态码,来决定下一步的操作(提交、回滚、重试)

  1. TransactionStatus.CommitTransaction: commit transaction,it means that allow consumers to consume this message.
  2. TransactionStatus.RollbackTransaction: rollback transaction,it means that the message will be deleted and not allowed to consume.
  3. TransactionStatus.Unknown: intermediate state,it means that MQ is needed to check back to determine the status.

Producer实现2个接口方法:

实际上官方的这种模式,重试指的是check的重试而不是execute的重试,因为execute方法只会执行一次,要特别注意。

  • execute:最终执行本地事务,并Ack执行状态给Broker
  • check:检查Producer是否成功执行了事务,并Ack执行状态给Broker
    实际上是可以写在一个方法里面的,execute的时候先根据key进行check,已经执行了就Ack OK,没有的话就执行。执行成功Ack Ok,执行失败就Ack RollBack。
    但是这里官方把这个功能拆分的更细了,降低单一方法的复杂度

事务消息的优点

  1. 消息的投递失败时(比如MQ宕机或者网络丢失),Producer是可以感知到的,因为最终的业务提交是在回调的execute方法里面执行的
  2. 如果消息成功发送到Broker,但是没有Producer最终Commit Ack时(比如Producer宕机了),该事务消息仍然处于预提交的状态,不会被消费者读取到,这保证了消息在 P和C端的状态一致性

Producer代码

        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("192.168.29.129:9876");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        for (int i = 0; i < 10; i++) {
            try {
                Message msg =new Message("TopicTest", "TagA", "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);
                Thread.sleep(10);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

事务监听器代码

public class TransactionListenerImpl implements TransactionListener {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String msgBody;
        //执行本地业务的时候,再插入一条数据到事务表中,供checkLocalTransaction进行check使用,避免doBusinessCommit业务成功,但是未返回Commit
        try {
            msgBody = new String(msg.getBody(), "utf-8");
            doBusinessCommit(msg.getKeys(),msgBody);
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (UnsupportedEncodingException e) {
             e.printStackTrace();
             return LocalTransactionState.ROLLBACK_MESSAGE;
    
        }
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Boolean result=checkBusinessStatus(msg.getKeys());
        if(result){
            return LocalTransactionState.COMMIT_MESSAGE;    
        }else{
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        
    }
    
    public static void doBusinessCommit(String messageKey,String msgbody){
        System.out.println("do something in DataBase");
        System.out.println("insert 事务消息到本地消息表中,消息执行成功,messageKey为:"+messageKey);
    }
    
    public static Boolean checkBusinessStatus(String messageKey){
        if(true){
            System.out.println("查询数据库 messageKey为"+messageKey+"的消息已经消费成功了,可以提交消息");
            return true;
        }else{
            System.out.println("查询数据库 messageKey为"+messageKey+"的消息不存在或者未消费成功了,可以回滚消息");
            return false;
        }
    }

参考资料

阿里云官方英文、最新的Demo和Guidence
http://rocketmq.apache.org/docs/transaction-example/

阿里云的帮助文档啊,超级详细而且有Demo
https://help.aliyun.com/document_detail/29551.html

阿里云在github上的Demos(包括整合Spring 和简单TCP的形式)
https://github.com/AliwareMQ/mq-demo

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,492评论 6 513
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,048评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,927评论 0 358
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,293评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,309评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,024评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,638评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,546评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,073评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,188评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,321评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,998评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,678评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,186评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,303评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,663评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,330评论 2 358

推荐阅读更多精彩内容