消息发送

如果你曾经使用过RocketMQ,那么一定对以下发送消息的代码不陌生

DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message message = new Message(topic, new byte[] {'hello, world'});
producer.send(message);

Producer启动

image.png

其实仅仅一行代码,在produer端的后台启动了多个线程来协同工作,接下来我们逐一阐述

我们都知道,RocketMQ是一个集群部署、跨网络的产品,除了producer、consumer需要网络传输外,数据还需要在集群中流转。所以一个高效、可靠的网络组件是必不可少的。而RocketMQ选择了netty

使用netty首先需要考虑的便是业务上的数据粘包问题,netty提供了一些较为常用的解决方案,如:固定长度(比如每次发送的消息长度均为1024byte)、固定分隔符等。而RocketMQ使用的则是最为通用的head、body分离方式,即head存储消息的长度,body存储真正的消息数据,具体实现可参见类o.a.r.r.n.NettyRemotingClient

而消息收发这块,RocketMQ将所有的消息都收敛到同一个协议类o.a.r.r.p.RemotingCommand中,即消息发送、接收都会将其封装在该类中,这样做的好处是不言而喻的,即统一规范,减轻网络协议适配不同的消息类型带来的负担

其中较为重要的2个 ChannelHanlder 如下

org.apache.rocketmq.remoting.netty.NettyEncoder
消息编码,向 broker 或 nameServer 发送消息时使用,将RemotingCommand转换为byte[]形式
org.apache.rocketmq.remoting.netty.NettyDecoder
消息解码,将byte[]转换为RemotingCommand对象,接收 broker 返回的消息时,进行解码操作

消息格式

消息格式是什么概念?在《消息存储》章节不是已经阐述过消息格式了吗?其实这是两个概念,《消息存储》章节是消息真正落盘时候的存储格式,本小节的消息格式是指消息以什么样的形态交给netty从而在网络上进行传输

消息格式

消息格式是什么概念?在《消息存储》章节不是已经阐述过消息格式了吗?其实这是两个概念,《消息存储》章节是消息真正落盘时候的存储格式,本小节的消息格式是指消息以什么样的形态交给netty从而在网络上进行传输

消息格式由MsgHeader及MsgBody组成,而消息的长度、标记、版本等重要参数都放在 header 中,body 中仅仅存储数据,没有额外字段;我们主要看一下 header 的数据格式

image.png

而站在 netty 视角来看,不论是 msgHeader 还是 msgBody,都属于 netty 网络消息的body部分,所以我们可以简单画一张 netty 视角的消息格式

image.png

Msg Header的自动适配

上文得知,RocketMQ将所有的消息类型、收发都收敛到类RemotingCommand中,但RocketMQ消息类型众多,除了常见的消息发送、接收外,还有通过msgID查询消息、msgKey查询消息、获取broker配置、清理不再使用的topic等等,用一个类适配如此多的类型,具体是如何实现的呢?当新增、修改一种类型又该怎么应对呢?

翻看源码便发现,RemotingCommand的消息头定义为一个接口org.apache.rocketmq.remoting.CommandCustomHeader,不同类型的请求都实现这个接口,并在自己的子类中定义成员变量;那RemotingCommand的消息头又是如何自动解析呢?

public void makeCustomHeaderToNet() {
    if (this.customHeader != null) {
        Field[] fields = getClazzFields(customHeader.getClass());
        if (null == this.extFields) {
            this.extFields = new HashMap<String, String>();
        }
        for (Field field : fields) {
            if (!Modifier.isStatic(field.getModifiers())) {
                String name = field.getName();
                if (!name.startsWith("this")) {
                    Object value = null;
                    try {
                        field.setAccessible(true);
                        value = field.get(this.customHeader);
                    } catch (Exception e) {
                        log.error("Failed to access field [{}]", name, e);
                    }

                    if (value != null) {
                        this.extFields.put(name, value.toString());
                    }
                }
            }
        }
    }
}

答案就是反射,通过反射获取子类的全部成员属性,并放入变量extFields中,makeCustomHeaderToNet()通过牺牲少量性能的方式,换取了程序极大的灵活性与扩展性,当新增请求类型时,仅需要编写新请求的encode、decode,不用修改其他类型请求的代码

image.png

Topic路由信息

Topic创建
发送消息的前置是需要创建一个topic,创建topic的admin命令如下

updateTopic -b <> -t <> -r <> -w <> -p <> -o <> -u <> -s <>

例如:
updateTopic -b 127.0.0.1:10911 -t testTopic -r 8 -w 8 -p 6 -o false -u false -s false

简单介绍下每个参数的作用

-b broker 地址,表示 topic 所在 Broker,只支持单台Broker,地址为ip:port
-c cluster 地址,表示 topic 所在 cluster,会向 cluster 中所有的 broker 发送请求
-t topic 名称
-r 可读队列数(默认为 8,后文还会展开)
-w 可写队列数(默认为 8,后文还会展开)
-p 指定新topic的读写权限 (W=2|R=4|WR=6)2表示当前topic仅可写入数据,4表示仅可读,6表示可读可写
-o set topic's order(true|false)
-u is unit topic (true|false)
-s has unit sub (true|false)

如果执行命令updateTopic -b 127.0.0.1:8899 -t testTopic -r 8 -w 8 意味着会在127.0.0.1:8899对应的broker下创建一个topic,这个topic的读写队列都是 8

那如果是这样的场景呢:集群A有3个master节点,当执行命令updateTopic -c clusterName -t testTopic -r 8 -w 8 后,站在集群A角度来看,当前topic总共创建了多少个写队列?其实 RocketMQ 接到这条命令后,会向3个 broker 分别发送创建 topic 的命令,这样每个broker上都会有8个读队列,8个写队列,所以站在集群的视角,这个 topic 总共会有 24 个读队列,24 个写队列

创建流程

一、创建Topic的客户端(DefaultMQAdminExt)
第一步:该客户端的启动流程与Producer、Consumer类似,需要start(),它们共用MQClientInstance#start()方法,启动后还有多个后台轮训线程
第二步:通过与NameServer交互,将指定ClusterName下所有的Broker信息拉下来
第三步:依次向这些Broker发送创建Topic的请求

二、Broker
第一步:Broker收到创建Topic的请求后,做一些新Topic的初始化动作,而后将该Topic的元数据存储在一个name为topics.json的本地文件中,因为在NameServer中并没有对数据进行持久化,所以此文件即为Topic路由数据的唯一持久化文件,当然这样的Broker一般是有多套的(其实此处是将所有json数据全部实例化后,替换本地文件,真实生产中,如果频繁创建、销毁topic,会带来大量的文件IO,以及内存负担,相信在未来近期的某个版本一定会进行修复)
第二步:向所有NameServer列表挨个发送Topic注册请求

三、NameServer
NameServer收到Broker注册Topic的消息后,便将其路由信息存储在内存中,当有Client请求Topic路由数据时,便将结果同步过去

我们以3个Broker、2个NameServer的集群举例:


image.png

Client → Broker :3 次网络IO,Client需要挨个向多个Broker发送注册请求
Broker → NameServer:6 次网络IO,Broker需要向所有NameServer发送注册请求

由此可见,NameServer确实是轻状态的节点,路由的原始数据其实都存储在Broker上,通过Broker向NameServer注册,再有Client从NameServer处获取元数据的方式来进行广播、同步。此方案是rmq独创,与kafka的重ZooKeeper形成对比,不过从实践角度看,该架构还是比较稳定的

writeQueueNum VS readQueueNum

首选需要明确的是,读、写队列,这两个概念是 RocketMQ 独有的,而 kafka 中只有一个partition的概念,不区分读写。一般情况下,这两个值建议设置为相等;我们分别看一下 client 端对它们的处理 (均在类MQClientInstance.java中)

producer端:

for (int i = 0; i < qd.getWriteQueueNums(); i++) {
    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
    info.getMessageQueueList().add(mq);
}

consumer端

for (int i = 0; i < qd.getReadQueueNums(); i++) {
    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
    mqList.add(mq);
}

如果2个队列设置不相等,例如我们设置6个写队列,4个读队列的话:

image.png

这样,4、5号队列中的数据一定不会被消费。

writeQueueNum > readQueueNum
大于 readQueueNum 部分的队列永远不会被消费
writeQueueNum < readQueueNum
所有队列中的数据都会被消费,但部分读队列数据一直是空的

这样设计有什么好处呢?其实是更精细的控制了读写操作,例如当我们要迁移 broker 时,可以首先将写入队列设置为0,将客户端引流至其他 broker 节点,等读队列数据也处理完毕后,再关闭 read 操作

路由数据格式

topic的路由数据如何由Admin发起创建,再被各个broker响应,继而被nameServer统一组织创建的流程我们暂且不讨论,为防止发散,我们直接从producer从nameServer获取路由数据开始。从nameServer获取到的路由数据格式如下

public class TopicRouteData extends RemotingSerializable {
    private String orderTopicConf;
    private List<QueueData> queueDatas;
    private List<BrokerData> brokerDatas;
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}

而存放路由数据的结构是queueDatas及brokerDatas

public class QueueData implements Comparable<QueueData> {
    private String brokerName;
    private int readQueueNums;
    private int writeQueueNums;
}

public class BrokerData implements Comparable<BrokerData> {
    private String cluster;
    private String brokerName;
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
}

在此,简单阐述一下RocketMQ的cluster、brokerName、brokerId的概念


image.png

上图描述了一个cluster下有3个broker,每个broker又有1个master,2个slave组成;这也就是为什么类BrokerData中有HashMap<Long, String> brokerAddrs变量的原因,因为可能同一个brokerName下由多个节点组成。注:master节点的编号始终为0

Topic路由信息何时发生变化

这些路由信息什么时候发生变化呢?我们举例说明

举例1:某集群有3台 master,分别向其中的2台发送了创建topic的命令,此时所有的clent端都知道这个topic的数据在这两个broker上;这个时候通过admin向第3台broker发送创建topic命令,nameServer的路由信息便发生了变更,等client端30秒轮训后,便可以更新到最新的topic路由信息

举例2:某集群有3台 master,topic分别在3台broker上都创建了,此时某台broker宕机,nameServer将其摘除,等待30秒轮询后,client拿到最新路由信息

思考:client 端路由信息的变化是依托于30秒的轮询,如果路由信息已经发生变化,且轮询未发生,client端拿着旧的topic路由信息访问集群,一定会有短暂报错,此处也是待优化的点

定时更新Topic路由信息

RocketMQ会每隔30秒更新topic的路由信息

image.png

与Broker心跳

主要分为两部分:

1、清空无效broker
2、向有效的broker发送心跳
2.4.1、清空无效的broker
由上节得知,RocketMQ会获取所有已经注册的topic所在的broker信息,并将这些信息存储在变量brokerAddrTable中,brokerAddrTable的存储结构如下

key: brokerName,例如一个master带2个slave都属于同一个brokerName
val: HashMap<Long, String>,key为brokerId(其中master的brokerId固定为0),val为ip地址

如何判断某个broker有效无效呢?判断依据便是MQClientInstance#topicRouteTable,这个变量是上节中从nameServer中同步过来的,如果brokerAddrTable中有broker A、B、C,而topicRouteTable只有A、B的话,那么就需要从brokerAddrTable中删除C。

需要注意的是,在整个check及替换过程中都添加了独占锁lockNamesrv,而上节中维护更新topic路由信息也是指定的该锁

发送心跳数据
image.png

此处目的仅为与broker保持网络心跳,如果连接失败或发生异常,仅会打印日志,并不会有额外操作

多Producer

这里简单提一下,其实在单个进程中,是可以启动多个Producer的,且相互隔离;实现起来感觉也比较容易,感觉直接new DefaultMQProducer()就行。不过这里有个性能上的问题,就是如果两个Producer操作了同样的Topic,此时去NameServer拉取路由数据的时候,将会线性的放大,因此RMQ引入了MQClientInstance概念,即在单个进程中,MQClientInstance是单例的,诸如获取Topic路由数据等,均是其统一发起,读者在源码中看到这个类时不要觉得陌生哈

image.png

消息发送

image.png

消息发送比较重要的是2点内容

发送数据的负载均衡问题;RocketMQ默认采用的是轮训的方式
消息发送的方式;分同步、异步、单向

消息保序 vs 负载均衡

默认选择队列的策略为轮询方式,来保证消息可以均匀的分配到每个队列;

既然说到队列,就不得不提到消息的有序性问题

普通消息

消息是无序的,可发送至任意队列,producer 也不关心消息会存储在哪个队列。在这种模式下,如果发送失败,producer 会按照轮询的方式,重新选取下一个队列进行重试

producer.send(message);

普通有序消息

用户可根据消息内容来选择一个队列发送 ,在这种情况下,消息也一般是保序的,例如我们可以通过业务字段(例如用户id)的 msgKey 取模来选择队列,这样同样 msgKey 的消息必定会落在同一个队列中。

与发送普通消息不同,如果发送失败,将不会进行重试,也比较好理解,普通消息发送失败后,也不会针对当前队列进行重试,而是选择下一个队列

producer.send(zeroMsg, (mqs, msg, arg) -> {
    int index = msg.getKeys().hashCode() % mqs.size();
    return mqs.get(index);
}, 1000);

但也存在异常情况,例如当前 topic 的路由信息发生了变化,取模后消息可能命中了另外一个队列,自然也无法做到严格保序

严格有序消息

即 producer 自己严格发送给指定的队列,如果发送异常则快速失败,可见这种方式可以严格保证发送的消息在同一个队列中,即便 topic 路由信息发生变化,也可以严格保序

producer.send(message, messageQueue);

消息发送的3种方式

RocketMQ的rpc组件采用的是netty,而netty的网络请求设计是完全异步的,所以一个请求一定可以拆成以下3个步骤

a、客户端发送请求到服务器(由于完全异步,所以请求数据可能只放在了socket缓冲区,并没有出网卡)
b、服务器端处理请求(此过程不涉及网络开销,不过通常也是比较耗时的)
c、服务器向客户端返回应答(请求的response)

同步发送消息

SendResult result = producer.send(zeroMsg);

此过程比较好理解,即完成a、b、c所有步骤后才会返回,耗时也是 a + b + c 的总和

3.2.2、异步发送消息
通常在业务中发送消息的代码如下:

SendCallback sendCallback = new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        // doSomeThing;
    }
    @Override
    public void onException(Throwable e) {
        // doSomeThing;
    }
};
producer.send(zeroMsg, sendCallback);

而RocketMQ处理异步消息的逻辑是,直接启动一个线程,而最终的结果异步回调SendCallback

ExecutorService executor = this.getAsyncSenderExecutor();
try {
    executor.submit(new Runnable() {
        @Override
        public void run() {
            try {
                sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
            } catch (Exception e) {
                sendCallback.onException(e);
            }
        }

    });
} catch (RejectedExecutionException e) {
    throw new MQClientException("executor rejected ", e);
}

单向发送消息

producer.sendOneway(zeroMsg);

此模式与sync模式类似,都要经过producer端在数据发送前的数据组装工作,不过在将数据交给netty,netty调用操作系统函数将数据放入socket缓冲区后,所有的过程便已结束。什么场景会用到此模式呢?比如对可靠性要求并不高,但要求耗时非常短的场景,比如日志收集等

三个请求哪个更快呢?如果单论一个请求的话,肯定是async异步的方式最快,因为它直接把工作交给另外一个线程去完成,主线程直接返回了;但不论是async还是sync,它们都是需要将 a、b、c 3个步骤都走完的,所以总开销并不会减少。但oneWay因为只需将数据放入socket缓冲区后,client 端就直接返回了,少了监听并解析 server 端 response 的过程,所以可以得到最好的性能

小结

本文阐述了producer端相对重要的一些功能点,感觉比较核心的还是队列相关的概念;但RocketMQ发展迭代了这么多年,也涵盖了很多及细小的特性,本文不能穷尽,比如“消息的压缩”、“规避发送延迟较长的broker”、“超时异常”等等,这些功能点独立且零碎,读源码时可以带着问题跟进,这样针对性强,效率也会高很多

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容