RocketMQ源码之selectOneMessageQueue选择队列

RocketMQ是通过MQFaultStrategy的selectOneMessageQueue方法来选择发送队列的

MQFaultStrategy

我们先来看下MQFaultStrategy中重要的属性

    //延迟容错对象,维护延迟Brokers的信息
    //key:brokerName
    private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
    
    //延迟容错开关
    private boolean sendLatencyFaultEnable = false;
    
    //延迟级别数组
    private long[] latencyMax = { 50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L };

    //不可用时长数组
    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

MQFaultStrategy中最重要的属性是latencyFaultTolerance,它维护了那些消息发送延迟较高的brokers的信息,同时延迟的时间长短对应了延迟级别latencyMax 和时长notAvailableDuration ,sendLatencyFaultEnable 控制了是否开启发送消息延迟功能。
来看主方法

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
//判断是否开启了开关
        if (this.sendLatencyFaultEnable) {
            try {
//获取一个可用的并且brokerName=lastBrokerName的消息队列
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }
//选择一个相对好的broker,不考虑可用性的消息队列
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
//随机选择一个消息队列
            return tpInfo.selectOneMessageQueue();
        }
//获得 lastBrokerName 对应的一个消息队列,不考虑该队列的可用性
        return tpInfo.selectOneMessageQueue(lastBrokerName);
}

我们来看开启了延迟容错的逻辑:

1.首先选择一个broker==lastBrokerName并且可用的一个队列(也就是该队列并没有因为延迟过长而被加进了延迟容错对象latencyFaultTolerance 中)
2.如果第一步中没有找到合适的队列,此时舍弃broker==lastBrokerName这个条件,选择一个相对较好的broker来发送
3.随机选择一个队列来发送

LatencyFaultToleranceImpl

selectOneMessageQueue选择队列的基本逻辑我们已经了解了,现在来具体看下LatencyFaultToleranceImpl是怎么来维护这些broker的可用性和延迟的呢?
主要属性faultItemTable 和内部类FaultItem

private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);
class FaultItem implements Comparable<FaultItem> {
        private final String name;
        private volatile long currentLatency;
        private volatile long startTimestamp;

        public FaultItem(final String name) {
            this.name = name;
        }
}

顾名思义这是一个延迟对象List,key为broker,value为FaultItem,FaultItem中存储了该broker的name,延迟界别和延迟开始的时间。

判断队列可用性方法如下:

public boolean isAvailable(final String name) {
     final FaultItem faultItem = this.faultItemTable.get(name);
     if (faultItem != null) {
         return faultItem.isAvailable();
     }
     return true;
}

如果faultItem 中不存在该broker,返回true,当存在时,还需判断isAvailable

public boolean isAvailable() {
     return (System.currentTimeMillis() - startTimestamp) >= 0;
}

如果延迟时间已过也返回true。

updateFaultItem

选择完队列后,执行发送步骤

//发送start时间
beginTimestampPrev = System.currentTimeMillis();
//发送
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
//发送结束时间
endTimestamp = System.currentTimeMillis();
//更新broker的延迟情况
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

我们可以看到这里计算了某个broker的发送时间,然后根据这个时间去更新FaultItem

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        if (this.sendLatencyFaultEnable) {
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
}

private long computeNotAvailableDuration(final long currentLatency) {
        for (int i = latencyMax.length - 1; i >= 0; i--) {
            if (currentLatency >= latencyMax[i])
                return this.notAvailableDuration[i];
        }

        return 0;
}
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
        FaultItem old = this.faultItemTable.get(name);
        if (null == old) {
            final FaultItem faultItem = new FaultItem(name);
            faultItem.setCurrentLatency(currentLatency);
            faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

            old = this.faultItemTable.putIfAbsent(name, faultItem);
            if (old != null) {
                old.setCurrentLatency(currentLatency);
                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            }
        } else {
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
}

这里根据延迟时间对比MQFaultStrategy中的延迟级别数组latencyMax 不可用时长数组notAvailableDuration 来将该broker加进faultItemTable中。

总结

1.所有的broker延迟信息都会被记录
2.发送消息时会选择延迟最低的broker来发送,提高效率
3.broker延迟过高会自动减少它的消息分配,充分发挥所有服务器的能力

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351

推荐阅读更多精彩内容