RocketMQ支持3种消息发送方式:同步(sync)、异步(async)和单向(one way)。
- 同步:发送者向RocketMQ执行发送消息API时,同步等待,直到消息服务器返回发送结果。
- 异步:发送者向RocketMQ执行发送消息API时,指定消息发送成功后的回调函数,调用消息发送API后,立即返回,消息发送者线程不阻塞,直到运行结束,消息发送成功或失败的回调任务在一个新的线程中执行。
- 单向:消息发送者向RocketMQ执行发送消息API时,直接返回,不等待消息服务器的结果,也不注册回调函数。简单地说,就是只管发,不在乎消息是否成功存储在消息服务器上。
RocketMQ消息发送需要考虑以下3个问题:
- 消息队列如何进行负载?
- 消息发送如何实现高可用?
- 批量消息发送如何实现一致性?
1. Topic路由机制
初次发送时会根据topic的名称向NameServer集群查询topic的路由信息,然后将其存储在本地内存缓存中,并且每隔30s依次遍历缓存中的topic,向NameServer查询最新的路由信息。如果成功查询到路由信息,会将这些信息更新至本地缓存,实现topic路由信息的动态感知。
RocketMQ提供了自动创建主题(topic)的机制,消息发送者向一个不存在的主题发送消息时,向NameServer查询该主题的路由信息会先返回空,如果开启了自动创建主题机制,会使用一个默认的主题名再次从NameServer查询路由信息,然后消息发送者会使用默认主题的路由信息进行负载均衡,但不会直接使用默认路由信息为新主题创建对应的路由信息。使用默认主题创建路由信息的流程如图所示。

注意:RocketMQ中的路由消息是持久化在Broker中的,NameServer中的路由信息来自Broker的心跳包并存储在内存中。
2. 消息发送高可用设计
发送端在自动发现主题的路由信息后,RocketMQ默认使用轮询算法进行路由的负载均衡。RocketMQ在消息发送时支持自定义的队列负载算法,需要特别注意的是,使用自定义的路由负载算法后,RocketMQ的重试机制将失效。RocketMQ为了实现消息发送高可用,引入了两个非常重要的特性:
- 消息发送重试机制。RocketMQ在消息发送时如果出现失败,默认会重试两次。
- 故障规避机制。当消息第一次发送失败时,如果下一次消息还是发送到刚刚失败的Broker上,其消息发送大概率还是会失败,因此为了保证重试的可靠性,在重试时会尽量避开刚刚接收失败的Broker,而是选择其他Broker上的队列进行发送,从而提高消息发送的成功率。
消息发送的高可用性设计如图所示:

3. 消息发送流程
消息生产者启动流程在DefaultMQProducerImpl#start中,记录了也记不住,这里不赘述了,主要记录核心业务流程。
RocketMQ消息发送的关键点如图所示:

消息发送流程主要的步骤为验证消息、查找路由、消息发送(包含异常处理机制),如下所示:
DefaultMQProducerImpl#send
public SendResult send(Message msg) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException {
return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}
public SendResult send(Message msg,long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
这里重点介绍下Broker的故障延迟机制。
MQFaultStrategy#selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final
String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
try {
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
// 轮训队列
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// 验证该消息队列是否可用
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
// lastBrokerName就是上一次选择的执行发送消息失败的Broker
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() %
writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
消息发送API核心入口DefaultMQProducerImpl#sendKernelImpl:
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout)
4. 批量消息发送
批量消息发送是将同一主题的多条消息一起打包发送到消息服务端,减少网络调用次数,提高网络传输效率。当然,并不是在同一批次中发送的消息数量越多,性能就越好,判断依据是单条消息的长度,如果单条消息内容比较长,则打包发送多条消息会影响其他线程发送消息的响应时间,并且单批次消息发送总长度不能超过Default MQProducer#maxMessageSize。批量发送消息要解决的是如何将这些消息编码,以便服务端能够正确解码每条消息的内容。
发送单条消息时,消息体的内容将保存在body中。发送批量消息时,需要将多条消息体的内容存储在body中。如何存储更便于服务端正确解析每条消息呢?RocketMQ采取的方式是,对单条消息内容使用固定格式进行存储,如图所示:

首先在消息发送端,调用batch()方法,将一批消息封装成MessageBatch对象。Message-Batch继承自Message对象,内部持有List<Message> messages。这样一来,批量消息发送与单条消息发送的处理流程就完全一样了。MessageBatch只需要将该集合中每条消息的消息体聚合成一个byte[]数组,在消息服务端能够从该byte[]数组中正确解析出消息。
MessageDecoder#encodeMessage
public static byte[] encodeMessage(Message message) {
byte[] body = message.getBody(); int
bodyLen = body.length;
String properties = messageProperties2String(message.getProperties());
byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
propertiesLength = (short) propertiesBytes.length; int
sysFlag = message.getFlag();
int storeSize = 4 // 1 TOTALSIZE
+ 4 // 2 MAGICCOD
+ 4 // 3 BODYCRC
+ 4 // 4 FLAG
+ 4 + bodyLen // 4 BODY
+ 2 + propertiesLength;
ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);
// 1 TOTALSIZE
byteBuffer.putInt(storeSize);
// 2 MAGICCODE
byteBuffer.putInt(0);
// 3 BODYCRC
byteBuffer.putInt(0);
// 4 FLAG
int flag = message.getFlag();
byteBuffer.putInt(flag);
// 5 BODY
byteBuffer.putInt(bodyLen);
byteBuffer.put(body);
// 6 properties
byteBuffer.putShort(propertiesLength);
byteBuffer.put(propertiesBytes);
return byteBuffer.array();
}