RocketMQ消息发送主要分为以下几步:
- 根据主题获取路由信息(主题对应的所有队列,队列所在broker,broker的IP地址和端口号)
- 选择其中一条队列
- 发送消息
- 成功返回,失败重试,同步和异步默认2次,单向发送不重试
详细流程
获取路由信息
public class TopicRouteData extends RemotingSerializable {
private List<QueueData> queueDatas;
private List<BrokerData> brokerDatas;
}
先从本地缓存获取,如果没有再从NameServer获取,从NameServer获取时会先尝试获取一个ReentrantLock,目的是防止并发请求,主题的路由信息更新并不频繁。另外生产者端还会每30s获取一次路由信息。
获取到路由信息后,还要跟本地缓存相比较,如果修改了,则更新发布和订阅者的路由信息。同一个进程可以启多个不同组的生产者和消费者,但客户端实例(MQClientInstance)是单例的
选择队列
首次随机选择一个队列,后面队列ID加一取模,并且不选择上一次的broker,通过ThreadLocal保证线程安全
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
for (int i = 0; i < this.messageQueueList.size(); i++) {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
//当index=Integer.MIN_VALUE时,Math.abs是负数,仍为MIN_VALUE
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
失败延迟机制:每次发送成功或是异常,都会记录发送时间,并根据发送时间计算broker不可用时间戳,如果发送成功但是大于等于550ms,会标记该broker 30s、60s、120s、180s、600s不可用,如果发送失败,会标记该broker 15分钟不可用,这段时间内不再向该broker发送消息
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
//如果发送成功,isolation是false,发送失败为true,会标记该broker 15分钟不可用
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
//latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
//notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
}