rocketmq如何实现分布式事务

如果同一个数据源在本地事物很好控制,但是在不断发展的互联网环境下,微服务越来越流行,这个时候,需要解决分布式事物,需要保证数据的最终一致性。
所谓分布式事务(全局事物),就是在两个不同的系统中(即两个不同的数据源,其实同一个数据源也行),无法同一个同一个spring事物去控制不同系统的事物的整体成功或者整体失败。

分布式事务的解决方法很多,但是性能和复杂性不一样,今天说说rocketmq如何保证分布式事务的。

rocketmq为我们提供TransactionMQProducer API的支持。这个api在发送消息的时候主要做了三件事:
1,先发送需要发送的消息到消息中间件broker,并获取到该message的transactionId。在第一次发送的时候,该消息的状态为LocalTransactionState.UNKNOW
2,处理本地事物。
3,根据本地事物的执行结果,结合transactionId,找到该消息的位置,在mq中标志该消息的最终处理结果。
整体的执行源码逻辑如下:


public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, Object arg) throws MQClientException {
        if (null == tranExecuter) {
            throw new MQClientException("tranExecutor is null", (Throwable)null);
        } else {
            Validators.checkMessage(msg, this.defaultMQProducer);
            SendResult sendResult = null;
            MessageAccessor.putProperty(msg, "TRAN_MSG", "true");
            MessageAccessor.putProperty(msg, "PGROUP", this.defaultMQProducer.getProducerGroup());

            try {
                //这里执行第一次发送消息,也就是预发送,并获取sendResult,这里包含msg的所有消息
                sendResult = this.send(msg);
            } catch (Exception var10) {
                throw new MQClientException("send message Exception", var10);
            }

            LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
            Throwable localException = null;
            //根据预发送消息的状态做不同的处理,这里主要看SEND_OK
            switch(sendResult.getSendStatus()) {
            case SEND_OK:
                try {
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }

// 这里做第二步,执行业务逻辑,即本地事物,
//具体的本地事物在LocalTransactionExecuter参数的实现类中,
//需要根据自己的业务逻辑去写,下面的//tranExecuter.executeLocalTransactionBranch(msg, arg);会执行实
//现类中的executeLocalTransactionBranch业务。
                    localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }

                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        this.log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        this.log.info(msg.toString());
                    }
                } catch (Throwable var9) {
                    this.log.info("executeLocalTransactionBranch exception", var9);
                    this.log.info(msg.toString());
                    localException = var9;
                }
                break;
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            }

            try {
// 这里的方法,其中的localTransactionState是第二次执行业务逻辑的结果
//可以根据这个结果,知道本地事物执行的成功还是失败。或者是异常localException,
//这样可以根据第一次发送消息的结果sendResult,去修改mq中第一次发送消息的状态,完成第三步操作。
                this.endTransaction(sendResult, localTransactionState, localException);
            } catch (Exception var8) {
                this.log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", var8);
            }

            TransactionSendResult transactionSendResult = new TransactionSendResult();
            transactionSendResult.setSendStatus(sendResult.getSendStatus());
            transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
            transactionSendResult.setMsgId(sendResult.getMsgId());
            transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
            transactionSendResult.setTransactionId(sendResult.getTransactionId());
            transactionSendResult.setLocalTransactionState(localTransactionState);
            return transactionSendResult;
        }
    }

上述第三步执行的方法endTransaction,逻辑如下:


 public void endTransaction(SendResult sendResult, LocalTransactionState localTransactionState, Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {

// 获取第一次发送消息的id
        MessageId id;
        if (sendResult.getOffsetMsgId() != null) {
            id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
        } else {
            id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
        }

    //获取事物id
        String transactionId = sendResult.getTransactionId();
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        requestHeader.setTransactionId(transactionId);
        requestHeader.setCommitLogOffset(id.getOffset());

       //根据本地事物执行状态localTransactionState,告知mq修改状态

        switch(localTransactionState) {
        case COMMIT_MESSAGE:
            requestHeader.setCommitOrRollback(Integer.valueOf(8));
            break;
        case ROLLBACK_MESSAGE:
            requestHeader.setCommitOrRollback(Integer.valueOf(12));
            break;
        case UNKNOW:
            requestHeader.setCommitOrRollback(Integer.valueOf(0));
        }

        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
        requestHeader.setMsgId(sendResult.getMsgId());
        String remark = localException != null ? "executeLocalTransactionBranch exception: " + localException.toString() : null;
//具体执行第三步完成整个事务。      
  this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, (long)this.defaultMQProducer.getSendMsgTimeout());
    }

上述:
如果第三阶段出现异常或者网络原因,就是本地事务执行成功持久化到数据库中,但是在修改mq中消息状态出现异常的时候,这样就可以出现本地和mq的消息状态的不一致问题。或者说,所有的数据不一致问题。rocketmq都会定期通过TransactionMQProducer API初始化的时候,设置的TransactionCheckListener的的实现类的checkLocalTransactionState 方法检查本地消息的状态,根据本地状态修改mq的状态

package org.apache.rocketmq.client.producer;

import org.apache.rocketmq.common.message.MessageExt;

public interface TransactionCheckListener {
    LocalTransactionState checkLocalTransactionState(MessageExt var1);
}

这样就可以确保生产端的数据要么是本地事物执行失败,rollback mq中消息,或者本地事物执行成功,mq中的消息也是待消费的状态。

说到这里,其实分布式事务已经完成90%。如果生产端的的第三部不是成功状态,消费端是不会消费第一阶段的消息的。所以,只要消费端消费的消息,肯定是生产端成功的消息,而消费端只要保证幂等性,就可以准确的消费消息(因为消费失败了,mq是可以重试,如果最总都没有被消费,基本是程序有bug,可以事后人工处理了。)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352