使用RocketMQ实现分布式事务

什么是rocketmq事务消息

事务消息是 Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。

RocketMQ的分布式事务又称为“半消息事务”。

事务消息处理流程

RocketMQ是靠半消息机制实现分布式事务

事务消息:MQ 提供类似 X/Open XA 的分布事务功能,通过 MQ 事务消息能达到分布式事务的最终一致。

半消息:暂不能投递的消息,发送方已经将消息成功发送到了 MQ 服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息。

半消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,MQ 服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。

事务消息交互流程如下图所示。

1. 生产者将消息发送至Apache RocketMQ服务端。

2. Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。

3. 生产者开始执行本地事务逻辑。

4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

• 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。

• 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。

5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。 说明 服务端回查的间隔时间和最大回查次数,请参见参数限制。

6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

事务消息生命周期

• 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态。

• 事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。

• 消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。

• 提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。

• 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。

• 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。Apache RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。

• 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

示例

下面是使用 RocketMQ 实现事务的一个例子:

生产者实现事务监听器:

首先,需要实现一个 RocketMQ 的事务监听器接口RocketMQLocalTransactionListener,这个接口定义了在发送和确认事务消息时的回调方法。您需要根据业务逻辑来实现这些方法。

executeLocalTransaction 方法:

这个方法在发送事务消息时被调用,用于执行本地事务。具体步骤如下:

1. 获取消息中的事务 ID。

2. 根据事务索引来模拟本地事务执行的状态。

3. 将事务状态放入 localTrans 映射中,以备后续 checkLocalTransaction 方法使用。

根据您的代码,executeLocalTransaction 方法中模拟了三种状态:

• 如果状态为 0,表示本地事务成功,返回 RocketMQLocalTransactionState.COMMIT,消息将被提交。

• 如果状态为 1,表示本地事务失败,返回 RocketMQLocalTransactionState.ROLLBACK,消息将被回滚。

• 如果状态为 2,表示本地事务状态未知,返回 RocketMQLocalTransactionState.UNKNOWN

checkLocalTransaction 方法:

这个方法在消息的确认状态时被调用,用于检查本地事务的状态。具体步骤如下:

1. 获取消息中的事务 ID。

2. 根据之前保存在 localTrans 映射中的事务状态,决定消息的提交、回滚或未知。

checkLocalTransaction 方法会根据之前在 executeLocalTransaction 方法中保存的状态来返回相应的事务状态。

@RocketMQTransactionListener  
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {  
    private AtomicInteger transactionIndex = new AtomicInteger(0);  
  
    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<String, Integer>();  
  
    @Override  
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {  
        String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);  
        System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n",  
        transId);  
        int value = transactionIndex.getAndIncrement();  
        int status = value % 3;  
        localTrans.put(transId, status);  
        if (status == 0) {  
            // Return local transaction with success(commit), in this case,  
            // this message will not be checked in checkLocalTransaction()  
            System.out.printf(" # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n", msg.getPayload());  
            return RocketMQLocalTransactionState.COMMIT;  
        }  

        if (status == 1) {  
            // Return local transaction with failure(rollback) , in this case,  
            // this message will not be checked in checkLocalTransaction()  
            System.out.printf(" # ROLLBACK # Simulating %s related local transaction exec failed! %n", msg.getPayload());  
            return RocketMQLocalTransactionState.ROLLBACK;  
        }  

        System.out.printf(" # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n");  
        return RocketMQLocalTransactionState.UNKNOWN;  
    }  

    @Override  
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {  
        String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);  
        RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;  
        Integer status = localTrans.get(transId);  
        if (null != status) {  
            switch (status) {  
            case 0:  
                retState = RocketMQLocalTransactionState.COMMIT;  
                break;  
            case 1:  
                retState = RocketMQLocalTransactionState.ROLLBACK;  
                break;  
            case 2:  
                retState = RocketMQLocalTransactionState.UNKNOWN;  
                break;  
            }  
        }  
        System.out.printf("------ !!! checkLocalTransaction is executed once," +  
        " msgTransactionId=%s, TransactionState=%s status=%s %n",  
        transId, retState, status);  
        return retState;  
    }  }

消费者

@Service  
@RocketMQMessageListener(topic = "${demo.rocketmq.transTopic}", consumerGroup = "string_trans_consumer")  
public class StringTransactionalConsumer implements RocketMQListener<String> {  
    @Override  
    public void onMessage(String message) {  
        System.out.printf("------- StringTransactionalConsumer received: %s \n", message);  
    }  }

这些步骤基本上涵盖了使用 RocketMQ 实现事务的主要过程。可以根据具体的业务需求和环境进行调整和配置。

总结

使用半消息实现分布式事务在提供分布式事务支持和保证消息传递的原子性方面具有优势,但需要引入MQ并提供查询事务接口。在选择是否使用半消息实现分布式事务时,需要根据具体的业务需求和系统性能要求来进行权衡和选择。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,451评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,172评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,782评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,709评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,733评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,578评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,320评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,241评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,686评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,878评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,992评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,715评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,336评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,912评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,040评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,173评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,947评论 2 355

推荐阅读更多精彩内容