如果你曾经使用过RocketMQ,那么一定对以下发送消息的代码不陌生
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message message = new Message(topic, new byte[] {'hello, world'});
producer.send(message);
Producer启动
其实仅仅一行代码,在produer端的后台启动了多个线程来协同工作,接下来我们逐一阐述
我们都知道,RocketMQ是一个集群部署、跨网络的产品,除了producer、consumer需要网络传输外,数据还需要在集群中流转。所以一个高效、可靠的网络组件是必不可少的。而RocketMQ选择了netty
使用netty首先需要考虑的便是业务上的数据粘包问题,netty提供了一些较为常用的解决方案,如:固定长度(比如每次发送的消息长度均为1024byte)、固定分隔符等。而RocketMQ使用的则是最为通用的head、body分离方式,即head存储消息的长度,body存储真正的消息数据,具体实现可参见类o.a.r.r.n.NettyRemotingClient
而消息收发这块,RocketMQ将所有的消息都收敛到同一个协议类o.a.r.r.p.RemotingCommand中,即消息发送、接收都会将其封装在该类中,这样做的好处是不言而喻的,即统一规范,减轻网络协议适配不同的消息类型带来的负担
其中较为重要的2个 ChannelHanlder 如下
org.apache.rocketmq.remoting.netty.NettyEncoder
消息编码,向 broker 或 nameServer 发送消息时使用,将RemotingCommand转换为byte[]形式
org.apache.rocketmq.remoting.netty.NettyDecoder
消息解码,将byte[]转换为RemotingCommand对象,接收 broker 返回的消息时,进行解码操作
消息格式
消息格式是什么概念?在《消息存储》章节不是已经阐述过消息格式了吗?其实这是两个概念,《消息存储》章节是消息真正落盘时候的存储格式,本小节的消息格式是指消息以什么样的形态交给netty从而在网络上进行传输
消息格式
消息格式是什么概念?在《消息存储》章节不是已经阐述过消息格式了吗?其实这是两个概念,《消息存储》章节是消息真正落盘时候的存储格式,本小节的消息格式是指消息以什么样的形态交给netty从而在网络上进行传输
消息格式由MsgHeader及MsgBody组成,而消息的长度、标记、版本等重要参数都放在 header 中,body 中仅仅存储数据,没有额外字段;我们主要看一下 header 的数据格式
而站在 netty 视角来看,不论是 msgHeader 还是 msgBody,都属于 netty 网络消息的body部分,所以我们可以简单画一张 netty 视角的消息格式
Msg Header的自动适配
上文得知,RocketMQ将所有的消息类型、收发都收敛到类RemotingCommand中,但RocketMQ消息类型众多,除了常见的消息发送、接收外,还有通过msgID查询消息、msgKey查询消息、获取broker配置、清理不再使用的topic等等,用一个类适配如此多的类型,具体是如何实现的呢?当新增、修改一种类型又该怎么应对呢?
翻看源码便发现,RemotingCommand的消息头定义为一个接口org.apache.rocketmq.remoting.CommandCustomHeader,不同类型的请求都实现这个接口,并在自己的子类中定义成员变量;那RemotingCommand的消息头又是如何自动解析呢?
public void makeCustomHeaderToNet() {
if (this.customHeader != null) {
Field[] fields = getClazzFields(customHeader.getClass());
if (null == this.extFields) {
this.extFields = new HashMap<String, String>();
}
for (Field field : fields) {
if (!Modifier.isStatic(field.getModifiers())) {
String name = field.getName();
if (!name.startsWith("this")) {
Object value = null;
try {
field.setAccessible(true);
value = field.get(this.customHeader);
} catch (Exception e) {
log.error("Failed to access field [{}]", name, e);
}
if (value != null) {
this.extFields.put(name, value.toString());
}
}
}
}
}
}
答案就是反射,通过反射获取子类的全部成员属性,并放入变量extFields中,makeCustomHeaderToNet()通过牺牲少量性能的方式,换取了程序极大的灵活性与扩展性,当新增请求类型时,仅需要编写新请求的encode、decode,不用修改其他类型请求的代码
Topic路由信息
Topic创建
发送消息的前置是需要创建一个topic,创建topic的admin命令如下
updateTopic -b <> -t <> -r <> -w <> -p <> -o <> -u <> -s <>
例如:
updateTopic -b 127.0.0.1:10911 -t testTopic -r 8 -w 8 -p 6 -o false -u false -s false
简单介绍下每个参数的作用
-b broker 地址,表示 topic 所在 Broker,只支持单台Broker,地址为ip:port
-c cluster 地址,表示 topic 所在 cluster,会向 cluster 中所有的 broker 发送请求
-t topic 名称
-r 可读队列数(默认为 8,后文还会展开)
-w 可写队列数(默认为 8,后文还会展开)
-p 指定新topic的读写权限 (W=2|R=4|WR=6)2表示当前topic仅可写入数据,4表示仅可读,6表示可读可写
-o set topic's order(true|false)
-u is unit topic (true|false)
-s has unit sub (true|false)
如果执行命令updateTopic -b 127.0.0.1:8899 -t testTopic -r 8 -w 8 意味着会在127.0.0.1:8899对应的broker下创建一个topic,这个topic的读写队列都是 8
那如果是这样的场景呢:集群A有3个master节点,当执行命令updateTopic -c clusterName -t testTopic -r 8 -w 8 后,站在集群A角度来看,当前topic总共创建了多少个写队列?其实 RocketMQ 接到这条命令后,会向3个 broker 分别发送创建 topic 的命令,这样每个broker上都会有8个读队列,8个写队列,所以站在集群的视角,这个 topic 总共会有 24 个读队列,24 个写队列
创建流程
一、创建Topic的客户端(DefaultMQAdminExt)
第一步:该客户端的启动流程与Producer、Consumer类似,需要start(),它们共用MQClientInstance#start()方法,启动后还有多个后台轮训线程
第二步:通过与NameServer交互,将指定ClusterName下所有的Broker信息拉下来
第三步:依次向这些Broker发送创建Topic的请求
二、Broker
第一步:Broker收到创建Topic的请求后,做一些新Topic的初始化动作,而后将该Topic的元数据存储在一个name为topics.json的本地文件中,因为在NameServer中并没有对数据进行持久化,所以此文件即为Topic路由数据的唯一持久化文件,当然这样的Broker一般是有多套的(其实此处是将所有json数据全部实例化后,替换本地文件,真实生产中,如果频繁创建、销毁topic,会带来大量的文件IO,以及内存负担,相信在未来近期的某个版本一定会进行修复)
第二步:向所有NameServer列表挨个发送Topic注册请求
三、NameServer
NameServer收到Broker注册Topic的消息后,便将其路由信息存储在内存中,当有Client请求Topic路由数据时,便将结果同步过去
我们以3个Broker、2个NameServer的集群举例:
Client → Broker :3 次网络IO,Client需要挨个向多个Broker发送注册请求
Broker → NameServer:6 次网络IO,Broker需要向所有NameServer发送注册请求
由此可见,NameServer确实是轻状态的节点,路由的原始数据其实都存储在Broker上,通过Broker向NameServer注册,再有Client从NameServer处获取元数据的方式来进行广播、同步。此方案是rmq独创,与kafka的重ZooKeeper形成对比,不过从实践角度看,该架构还是比较稳定的
writeQueueNum VS readQueueNum
首选需要明确的是,读、写队列,这两个概念是 RocketMQ 独有的,而 kafka 中只有一个partition的概念,不区分读写。一般情况下,这两个值建议设置为相等;我们分别看一下 client 端对它们的处理 (均在类MQClientInstance.java中)
producer端:
for (int i = 0; i < qd.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
info.getMessageQueueList().add(mq);
}
consumer端
for (int i = 0; i < qd.getReadQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
mqList.add(mq);
}
如果2个队列设置不相等,例如我们设置6个写队列,4个读队列的话:
这样,4、5号队列中的数据一定不会被消费。
writeQueueNum > readQueueNum
大于 readQueueNum 部分的队列永远不会被消费
writeQueueNum < readQueueNum
所有队列中的数据都会被消费,但部分读队列数据一直是空的
这样设计有什么好处呢?其实是更精细的控制了读写操作,例如当我们要迁移 broker 时,可以首先将写入队列设置为0,将客户端引流至其他 broker 节点,等读队列数据也处理完毕后,再关闭 read 操作
路由数据格式
topic的路由数据如何由Admin发起创建,再被各个broker响应,继而被nameServer统一组织创建的流程我们暂且不讨论,为防止发散,我们直接从producer从nameServer获取路由数据开始。从nameServer获取到的路由数据格式如下
public class TopicRouteData extends RemotingSerializable {
private String orderTopicConf;
private List<QueueData> queueDatas;
private List<BrokerData> brokerDatas;
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
而存放路由数据的结构是queueDatas及brokerDatas
public class QueueData implements Comparable<QueueData> {
private String brokerName;
private int readQueueNums;
private int writeQueueNums;
}
public class BrokerData implements Comparable<BrokerData> {
private String cluster;
private String brokerName;
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
}
在此,简单阐述一下RocketMQ的cluster、brokerName、brokerId的概念
上图描述了一个cluster下有3个broker,每个broker又有1个master,2个slave组成;这也就是为什么类BrokerData中有HashMap<Long, String> brokerAddrs变量的原因,因为可能同一个brokerName下由多个节点组成。注:master节点的编号始终为0
Topic路由信息何时发生变化
这些路由信息什么时候发生变化呢?我们举例说明
举例1:某集群有3台 master,分别向其中的2台发送了创建topic的命令,此时所有的clent端都知道这个topic的数据在这两个broker上;这个时候通过admin向第3台broker发送创建topic命令,nameServer的路由信息便发生了变更,等client端30秒轮训后,便可以更新到最新的topic路由信息
举例2:某集群有3台 master,topic分别在3台broker上都创建了,此时某台broker宕机,nameServer将其摘除,等待30秒轮询后,client拿到最新路由信息
思考:client 端路由信息的变化是依托于30秒的轮询,如果路由信息已经发生变化,且轮询未发生,client端拿着旧的topic路由信息访问集群,一定会有短暂报错,此处也是待优化的点
定时更新Topic路由信息
RocketMQ会每隔30秒更新topic的路由信息
与Broker心跳
主要分为两部分:
1、清空无效broker
2、向有效的broker发送心跳
2.4.1、清空无效的broker
由上节得知,RocketMQ会获取所有已经注册的topic所在的broker信息,并将这些信息存储在变量brokerAddrTable中,brokerAddrTable的存储结构如下
key: brokerName,例如一个master带2个slave都属于同一个brokerName
val: HashMap<Long, String>,key为brokerId(其中master的brokerId固定为0),val为ip地址
如何判断某个broker有效无效呢?判断依据便是MQClientInstance#topicRouteTable,这个变量是上节中从nameServer中同步过来的,如果brokerAddrTable中有broker A、B、C,而topicRouteTable只有A、B的话,那么就需要从brokerAddrTable中删除C。
需要注意的是,在整个check及替换过程中都添加了独占锁lockNamesrv,而上节中维护更新topic路由信息也是指定的该锁
发送心跳数据
此处目的仅为与broker保持网络心跳,如果连接失败或发生异常,仅会打印日志,并不会有额外操作
多Producer
这里简单提一下,其实在单个进程中,是可以启动多个Producer的,且相互隔离;实现起来感觉也比较容易,感觉直接new DefaultMQProducer()就行。不过这里有个性能上的问题,就是如果两个Producer操作了同样的Topic,此时去NameServer拉取路由数据的时候,将会线性的放大,因此RMQ引入了MQClientInstance概念,即在单个进程中,MQClientInstance是单例的,诸如获取Topic路由数据等,均是其统一发起,读者在源码中看到这个类时不要觉得陌生哈
消息发送
消息发送比较重要的是2点内容
发送数据的负载均衡问题;RocketMQ默认采用的是轮训的方式
消息发送的方式;分同步、异步、单向
消息保序 vs 负载均衡
默认选择队列的策略为轮询方式,来保证消息可以均匀的分配到每个队列;
既然说到队列,就不得不提到消息的有序性问题
普通消息
消息是无序的,可发送至任意队列,producer 也不关心消息会存储在哪个队列。在这种模式下,如果发送失败,producer 会按照轮询的方式,重新选取下一个队列进行重试
producer.send(message);
普通有序消息
用户可根据消息内容来选择一个队列发送 ,在这种情况下,消息也一般是保序的,例如我们可以通过业务字段(例如用户id)的 msgKey 取模来选择队列,这样同样 msgKey 的消息必定会落在同一个队列中。
与发送普通消息不同,如果发送失败,将不会进行重试,也比较好理解,普通消息发送失败后,也不会针对当前队列进行重试,而是选择下一个队列
producer.send(zeroMsg, (mqs, msg, arg) -> {
int index = msg.getKeys().hashCode() % mqs.size();
return mqs.get(index);
}, 1000);
但也存在异常情况,例如当前 topic 的路由信息发生了变化,取模后消息可能命中了另外一个队列,自然也无法做到严格保序
严格有序消息
即 producer 自己严格发送给指定的队列,如果发送异常则快速失败,可见这种方式可以严格保证发送的消息在同一个队列中,即便 topic 路由信息发生变化,也可以严格保序
producer.send(message, messageQueue);
消息发送的3种方式
RocketMQ的rpc组件采用的是netty,而netty的网络请求设计是完全异步的,所以一个请求一定可以拆成以下3个步骤
a、客户端发送请求到服务器(由于完全异步,所以请求数据可能只放在了socket缓冲区,并没有出网卡)
b、服务器端处理请求(此过程不涉及网络开销,不过通常也是比较耗时的)
c、服务器向客户端返回应答(请求的response)
同步发送消息
SendResult result = producer.send(zeroMsg);
此过程比较好理解,即完成a、b、c所有步骤后才会返回,耗时也是 a + b + c 的总和
3.2.2、异步发送消息
通常在业务中发送消息的代码如下:
SendCallback sendCallback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// doSomeThing;
}
@Override
public void onException(Throwable e) {
// doSomeThing;
}
};
producer.send(zeroMsg, sendCallback);
而RocketMQ处理异步消息的逻辑是,直接启动一个线程,而最终的结果异步回调SendCallback
ExecutorService executor = this.getAsyncSenderExecutor();
try {
executor.submit(new Runnable() {
@Override
public void run() {
try {
sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
} catch (Exception e) {
sendCallback.onException(e);
}
}
});
} catch (RejectedExecutionException e) {
throw new MQClientException("executor rejected ", e);
}
单向发送消息
producer.sendOneway(zeroMsg);
此模式与sync模式类似,都要经过producer端在数据发送前的数据组装工作,不过在将数据交给netty,netty调用操作系统函数将数据放入socket缓冲区后,所有的过程便已结束。什么场景会用到此模式呢?比如对可靠性要求并不高,但要求耗时非常短的场景,比如日志收集等
三个请求哪个更快呢?如果单论一个请求的话,肯定是async异步的方式最快,因为它直接把工作交给另外一个线程去完成,主线程直接返回了;但不论是async还是sync,它们都是需要将 a、b、c 3个步骤都走完的,所以总开销并不会减少。但oneWay因为只需将数据放入socket缓冲区后,client 端就直接返回了,少了监听并解析 server 端 response 的过程,所以可以得到最好的性能
小结
本文阐述了producer端相对重要的一些功能点,感觉比较核心的还是队列相关的概念;但RocketMQ发展迭代了这么多年,也涵盖了很多及细小的特性,本文不能穷尽,比如“消息的压缩”、“规避发送延迟较长的broker”、“超时异常”等等,这些功能点独立且零碎,读源码时可以带着问题跟进,这样针对性强,效率也会高很多