RocketMQ消息发送流程

RocketMQ消息发送主要分为以下几步:

  • 根据主题获取路由信息(主题对应的所有队列,队列所在broker,broker的IP地址和端口号)
  • 选择其中一条队列
  • 发送消息
  • 成功返回,失败重试,同步和异步默认2次,单向发送不重试

详细流程

获取路由信息

public class TopicRouteData extends RemotingSerializable {
    private List<QueueData> queueDatas;
    private List<BrokerData> brokerDatas;
}

先从本地缓存获取,如果没有再从NameServer获取,从NameServer获取时会先尝试获取一个ReentrantLock,目的是防止并发请求,主题的路由信息更新并不频繁。另外生产者端还会每30s获取一次路由信息。

获取到路由信息后,还要跟本地缓存相比较,如果修改了,则更新发布和订阅者的路由信息。同一个进程可以启多个不同组的生产者和消费者,但客户端实例(MQClientInstance)是单例的

选择队列

首次随机选择一个队列,后面队列ID加一取模,并且不选择上一次的broker,通过ThreadLocal保证线程安全

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    } else {
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            int index = this.sendWhichQueue.getAndIncrement();
            int pos = Math.abs(index) % this.messageQueueList.size();
            //当index=Integer.MIN_VALUE时,Math.abs是负数,仍为MIN_VALUE
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        return selectOneMessageQueue();
    }
}

失败延迟机制:每次发送成功或是异常,都会记录发送时间,并根据发送时间计算broker不可用时间戳,如果发送成功但是大于等于550ms,会标记该broker 30s、60s、120s、180s、600s不可用,如果发送失败,会标记该broker 15分钟不可用,这段时间内不再向该broker发送消息

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    if (this.sendLatencyFaultEnable) {
        //如果发送成功,isolation是false,发送失败为true,会标记该broker 15分钟不可用
        long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
    }
}
private long computeNotAvailableDuration(final long currentLatency) {
    for (int i = latencyMax.length - 1; i >= 0; i--) {
        //latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
        //notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
        if (currentLatency >= latencyMax[i])
            return this.notAvailableDuration[i];
    }
    return 0;
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容