一定让你看懂的RocketMQ事务消息源码分析

前言

得益于MQ削峰填谷,系统解耦,操作异步等功能特性,在互联网行业,可以说有分布式服务的地方,MQ都往往不会缺席。由阿里自研的RocketMQ更是经历了多年的双十一高并发挑战,其中4.3.0版本推出了事务消息的新特性,本文对RocketMQ 4.5.0版本事务消息相关的源码跟踪介绍,通过阅读读者可以知道:

事务消息解决什么样的问题

事务消息的实现原理及其设计亮点

解决什么问题

假设我所在的系统现在有这样一个场景:

本地开启数据库事务进行扣款操作,成功后发送MQ消息给库存中心进行发货。

有人会想到开启mybatis事务实现,把本地事务和MQ消息放在一起不就行了吗?如果MQ发送成功,就提交事务,发送失败就回滚事务,整套操作一气呵成。

transaction{  扣款();booleansuccess = 发送MQ();if(success){    commit();  }else{    rollBack();  }}

看似没什么问题,但是网络是不可靠的。

假设MQ返回过来的响应因为网络原因迟迟没有收到,所以在面对不确定的MQ返回结果只好进行回滚。但是MQ 服务器又确实是收到了这条消息的,只是给客户端的响应丢失了,所以导致的结果就是扣款失败,成功发货。


既然MQ消息的发送不能和本地事务写在一起,那如何来保证其整体具有原子性的需求呢?答案就是今天我们介绍的主角:事务消息

概览


总体而言RocketMQ事务消息分为两条主线

定时任务发送流程:发送half message(半消息),执行本地事务,发送事务执行结果

定时任务回查流程:MQ服务器回查本地事务,发送事务执行结果

因此本文也通过这两条主线对源码进行分析

源码分析

半消息发送流程

本地应用(client)

在本地应用发送事务消息的核心类是TransactionMQProducer,该类通过继承DefaultMQProducer来复用大部分发送消息相关的逻辑,这个类的代码量非常少只有100来行,下面是这个类的sendMessageTransaction方法

@OverridepublicTransactionSendResultsendMessageInTransaction(finalMessage msg,finalObject arg)throwsMQClientException{if(null==this.transactionListener) {thrownewMQClientException("TransactionListener is null",null);    }returnthis.defaultMQProducerImpl.sendMessageInTransaction(msg,null, arg);}

这个方法做了两件事,

检查transactionListener是否存在

调用父类执行事务消息发送

TransactionListener在事务消息流程中起到至关重要的作用,一起看看这个接口

publicinterfaceTransactionListener{/**    * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.    *    *@parammsg Half(prepare) message    *@paramarg Custom business parameter    *@returnTransaction state    */LocalTransactionStateexecuteLocalTransaction(finalMessage msg,finalObject arg);/**    * When no response to prepare(half) message. broker will send check message to check the transaction status, and this    * method will be invoked to get local transaction status.    *    *@parammsg Check message    *@returnTransaction state    */LocalTransactionStatecheckLocalTransaction(finalMessageExt msg);}

接口注释说的很明白,配合上面的概览图来看就是,executeLocalTransaction方法对应的就是执行本地事务操作,checkLocalTransaction对应的就是回查本地事务操作。

下面是DefaultMQProducer类的sendMessageInTransaction方法源码

publicTransactionSendResultsendMessageInTransaction(finalMessage msg,finalLocalTransactionExecuter localTransactionExecuter,finalObject arg)throwsMQClientException{    ...    SendResult sendResult =null;    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED,"true");    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP,this.defaultMQProducer.getProducerGroup());    ...        sendResult =this.send(msg);    ...switch(sendResult.getSendStatus()) {caseSEND_OK: {            ...        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);                ...break;caseFLUSH_DISK_TIMEOUT:caseFLUSH_SLAVE_TIMEOUT:caseSLAVE_NOT_AVAILABLE:            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;break;default:break;    }    ...this.endTransaction(sendResult, localTransactionState, localException);...}复制代码

为了使源码的逻辑更加直观,笔者精简了核心代码。sendMessageInTransaction方法主要做了以下事情

给消息打上事务消息相关的标记,用于MQ服务端区分普通消息和事务消息

发送半消息(half message)

发送成功则由transactionListener执行本地事务

执行endTransaction方法,如果半消息发送失败本地事务执行失败告诉服务端是删除半消息,半消息发送成功本地事务执行成功则告诉服务端生效半消息。

发送半消息流程,Client端代码到这里差不多就结束了,接下来看看RocketMQ Server端是如何处理的

RocketMQ Server

Server在接收到消息过后会进行一些领域对象的转化和是否支持事务消息的权限校验,对理解事务消息用处不大,此处就省略对旁枝末节的介绍了。下面是TransactionalMessageBridge类处理half message的源码

publicPutMessageResultputHalfMessage(MessageExtBrokerInner messageInner){returnstore.putMessage(parseHalfMessageInner(messageInner));}privateMessageExtBrokerInnerparseHalfMessageInner(MessageExtBrokerInner msgInner){    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,        String.valueOf(msgInner.getQueueId()));    msgInner.setSysFlag(        MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());    msgInner.setQueueId(0);    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));returnmsgInner;}

这两个方法主要做了以下事情:

publicclassMessageimplementsSerializable{privatestaticfinallongserialVersionUID =8445773977080406428L;privateString topic;privateintflag;privateMap properties;privatebyte[] body;privateString transactionId;}

将消息的topic,queueId放进消息体自身的map里进行缓存

将消息的topic 设置为“RMQ_SYS_TRANS_OP_HALF_TOPIC”,queueId设置为0

将消息写入磁盘持久化

可以看到所有的事务半消息都会被放进同一个topic的同一个queue里面,通过对topic的区分,从而避免了半消息被consumer给消费到

Server将半消息持久化后然后会发送结果给我们本地的应用程序。到了这里Server端对半消息的处理就结束了,紧接着的是定时任务的登场。

定时任务回查流程

RocketMQ Server

定时任务是一个叫TransactionalMessageService类的线程,下面是该类的check方法

@Overridepublicvoidcheck(longtransactionTimeout,inttransactionCheckMax,    AbstractTransactionalMessageCheckListener listener){                  ...if(!putBackHalfMsgQueue(msgExt, i)) {continue;    }      listener.resolveHalfMsg(msgExt);  } ...}

check方法非常长,省略的代码大致都是对半消息进行过滤(如超过72小时的事务消息,就被算作过期),只保留符合条件的半消息对其进行回查。

其中很有意思的是putBackHalfMsgQueue方法,因为每次把半消息从磁盘拉到内存里进行处理都会对其属性进行改变(例如TRANSACTION_CHECK_TIMES,这是是否丢弃事务消息的关键信息),所以在发送回查消息之前需要对半消息再次放进磁盘。RocketMQ采取的方法是基于最新的物理偏移量重新写入,而不是对原有的半消息进行修改,其中的目的就是RocketMQ的存储设计采用顺序写,如果去修改消息 ,无法做到高性能。

下面是resolveHalfMsg方法,主要就是开启一个线程然后发送check消息。

publicvoidresolveHalfMsg(finalMessageExt msgExt){    executorService.execute(newRunnable() {@Overridepublicvoidrun(){try{                sendCheckMessage(msgExt);            }catch(Exception e) {                LOGGER.error("Send check message error!", e);            }        }    });}

本地应用(client)

下面是DefaultMQProducerImpl的checkTransactionState方法,是本地应用对回查消息的处理逻辑

@OverridepublicvoidcheckTransactionState(finalString addr,finalMessageExt msg,finalCheckTransactionStateRequestHeader header){    Runnable request =newRunnable() {        ...@Overridepublicvoidrun(){            ...    TransactionListener transactionListener = getCheckListener();            ...    localTransactionState = transactionListener.checkLocalTransaction(message);              ...this.processTransactionState(                    localTransactionState,                    group,                    exception);                }privatevoidprocessTransactionState(          ... DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,3000);          ...        }    };this.checkExecutor.submit(request);}

精简代码逻辑后可以清晰的看到

开启一个线程来执行回查的逻辑

执行transactionListener的checkLocalTransaction方法来获取本地事务执行的结果

RocketMQ Server

RocketMQ 服务器在收到Client发过来的Commit消息后会

读出半消息——>恢复topic等原消息体的信息——>和普通消息一样再次写入磁盘——>删除之前的半消息

如果是Rollback消息则直接删除之前的半消息

到此,整条RocketMQ 事务消息的调用链就结束了

思考

1. 分布式事务等于事务消息吗?

两者并没有关系,事务消息仅仅保证本地事务和MQ消息发送形成整体的原子性,而投递到MQ服务器后,消费者是否能一定消费成功是无法保证的。

2. 源码设计上有什么亮点吗?

通过对整条链路源码的学习理解发现还是有不少亮点的

server端回查消息的发送,client端回查消息逻辑的处理,client端commit/rollback消息的提交都是用了异步进行,可以说能异步的地方都用了异步,通过异步+重试的方式保证了在分布式环境中即使短暂的网络状况不良好,也不会影响整体逻辑。

引入TransactionListener,真正做到了开闭原则以及依赖倒置原则,面向接口编程。整体扩展性做得非常好,使用者只需要编写自己的Listener就可以做到事务消息的发送,非常方便

TransactionMQProducer通过继承DefaultMQProducer极大地复用了关于发送消息相关的逻辑

3. 源码设计上有什么不足吗?

RocketMQ作为一款极其成功的消息中间件,要发现不足不是那么容易了,笔者谈几点看法

sendMessageIntransaction等事务相关的方法被划分在了DefaultMQProducer里面,从内聚的角度来说这是跟事务相关的发送消息方法应该被划分在TransactionMQProducer。

所有topic的半消息都会写在topic为RMQ_SYS_TRANS_OP_HALF_TOPIC的半消息队列里,并且每条半消息,在整个链路里会被写多次,如果并发很大且大部分消息都是事务消息的话,可靠性会存在问题。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容