RocketMQ 延迟消息

概述

RocketMQ 支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息。

预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h

在消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间。broker在接收到延迟消息的时候会把对应延迟级别的消息先存储到对应的延迟队列中,等延迟消息时间到达时,会把消息重新存储到对应的topic的queue里面。

broker 处理延迟消息

CommitLog.putMessage()

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
    // 设置消息的存储时间
    msg.setStoreTimestamp(System.currentTimeMillis());
    // 设置消息体的校验位
    msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
    
    AppendMessageResult result = null;

    StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

    String topic = msg.getTopic();
    int queueId = msg.getQueueId();

    // 获取消息的 SysFlag 
    final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
    // 1、非事务消息 或 已commit事物消息
    if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
        || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
        // 2、判断消息是否设置延迟
        if (msg.getDelayTimeLevel() > 0) {
            // 3、判断设置的延迟等级是否大于最大级别,如果大于最大值,则设置最大值(默认最大级别为18)
            if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            }
            // 4、延迟消息的 Topic 名称为 “SCHEDULE_TOPIC_XXXX”
            topic = ScheduleMessageService.SCHEDULE_TOPIC;
            // 5、根据延迟级别获取对应的 Queue 。一个延迟级别对应一个 Queue
            queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

            // 6、消息原始的 Topic 名称和 QueueId 备份保存到 property 中
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
            msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
            // 7、修改消息的 topic 和 queueId,让该消息先投递到延迟消息队列中
            msg.setTopic(topic);
            msg.setQueueId(queueId);
        }
    }
    // 省略代码
    ........
}

1、判断该消息类型,如果是非事物消息或事物已commit消息,才能处理延迟消息。
2、判断该消息是否设置延迟,如果延迟级别大于零,则说明该消息时延迟消息。
3、判断设置的延迟等级是否大于最大级别,如果大于最大值,则设置最大值(默认最大级别为18)
4、延迟消息的 Topic 名称为 “SCHEDULE_TOPIC_XXXX”
5、根据延迟级别获取对应的 Queue 。一个延迟级别对应一个 Queue
6、消息原始的 Topic 名称和 QueueId 备份保存到 property 中
7、修改消息的 topic 和 queueId,让该消息先投递到延迟消息队列中

延迟消息级别

MessageStoreConfig.java

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

解析初始化延迟级别

// 存储消息级别对应的延迟时间
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
        new ConcurrentHashMap<Integer, Long>(32);

// 解析并初始化消息延迟级别
public boolean parseDelayLevel() {
    // 时间单位
    HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
    timeUnitTable.put("s", 1000L);
    timeUnitTable.put("m", 1000L * 60);
    timeUnitTable.put("h", 1000L * 60 * 60);
    timeUnitTable.put("d", 1000L * 60 * 60 * 24);
    // 获取 messageDelayLevel 定义的延迟消息信息
    String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
    try {
        String[] levelArray = levelString.split(" ");
        for (int i = 0; i < levelArray.length; i++) {
            String value = levelArray[i];
            String ch = value.substring(value.length() - 1);
            Long tu = timeUnitTable.get(ch);

            int level = i + 1;
            if (level > this.maxDelayLevel) {
                this.maxDelayLevel = level;
            }
            long num = Long.parseLong(value.substring(0, value.length() - 1));
            long delayTimeMillis = tu * num;
            this.delayLevelTable.put(level, delayTimeMillis);
        }
    } catch (Exception e) {
        log.error("parseDelayLevel exception", e);
        log.info("levelString String = {}", levelString);
        return false;
    }

    return true;
}

解析messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; 字符串,并每一个延迟时间对应一个延迟级别,存储到 delayLevelTable 中。

用户只需要设置延迟级别,然后通过 delayLevelTable 就知道该级别对应的延迟时间是多少。

处理延迟消息

public void start() {
    // 为每一个延迟级别设置一个定时任务处理消息的投递
    for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
        Integer level = entry.getKey();
        Long timeDelay = entry.getValue();
        Long offset = this.offsetTable.get(level);
        if (null == offset) {
            offset = 0L;
        }

        if (timeDelay != null) {
            this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
        }
    }
    // 定时持久化 每个消息级别处理对应queue的offset信息
    this.timer.scheduleAtFixedRate(new TimerTask() {

        @Override
        public void run() {
            try {
                ScheduleMessageService.this.persist();
            } catch (Throwable e) {
                log.error("scheduleAtFixedRate flush exception", e);
            }
        }
    }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}

1、为每一个延迟队列创建一个定时任务,定时处理延迟队列中的数据,把该数据从延迟队列中取出,然后投递到实际发送的消息队列(queue)中。

2、定时持久化每个消息级别处理对应queue的offset信息。(启动后延迟10秒开始持久化,以后每间隔10秒保存一次)

延迟消息投递

在 DeliverDelayedMessageTimerTask 中处理延迟消息的投递,代码如下:

public void executeOnTimeup() {
    // 根据 topic 和 queueId 获取延迟队列对应的 ConsumeQueue
    ConsumeQueue cq =
            ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));

    long failScheduleOffset = offset;

    if (cq != null) {
        // 通过偏移量获取延迟队列 MappedFile (MappedFile 对应的 Buffer)
        // ConsumerQueue 中每个消息存储的长度为20位,而 offset 是消息的个数,实际的偏移量为 offset * 20
        SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
        if (bufferCQ != null) {
            try {
                long nextOffset = offset;
                int i = 0;
                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                // bufferCQ.getSize() 为延迟队列中可以读取到的延迟消息长度(包括已到时间和未到实际的数据)
                // ConsumeQueue.CQ_STORE_UNIT_SIZE 为20。 ConsumerQueue 中每个消息固定的长度。
                for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                    // 从ConsumerQueue 中获取一条消息。
                    // 消息包括3部分:物理偏移量、消息大小、Tag的HashCode
                    // 这里的tagsCode在延迟消息队列中存储是存储在 【延迟队列中的时间 + 延迟的时间】(通过这个时间来确定消息是否达到延迟的时间)
                    long offsetPy = bufferCQ.getByteBuffer().getLong();
                    int sizePy = bufferCQ.getByteBuffer().getInt();
                    long tagsCode = bufferCQ.getByteBuffer().getLong();

                    // 通过 tagsCode 来判断是否存储的是延时时间
                    // 如果是 Tag 的 hashcode ,那么最大值为 Integer.Max
                    // 如果是 延迟时间,时间为long类型,肯定大于 Integer.Max
                    if (cq.isExtAddr(tagsCode)) {
                        //获取延迟发送时间
                        if (cq.getExt(tagsCode, cqExtUnit)) {
                            tagsCode = cqExtUnit.getTagsCode();
                        }
                        // 从commitLog中获取存储时间,然后从新计算延迟发送时间。延迟时发送时间=消息发送到延迟队列存储时间+延迟时间
                        else {
                            //can't find ext content.So re compute tags code.
                            log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                                    tagsCode, offsetPy, sizePy);
                            long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                            tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                        }
                    }

                    long now = System.currentTimeMillis();
                    // 计算投递时间,如果已经到投递时间,则返回当前时间,否则返回需要等待投递的时间
                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

                    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

                    long countdown = deliverTimestamp - now;
                    // countdown <=0 是需要马上投递的延迟消息
                    if (countdown <= 0) {
                        // 从 CommitLog 中获取当前消息的信息
                        MessageExt msgExt =
                                ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                        offsetPy, sizePy);

                        if (msgExt != null) {
                            try {
                                // 这里从 property 中解析出正真的 Topic、QueueId、TagCode 信息,存储到 msgInner 中。
                                MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                // 消息投递,跟 producer 发送消息处理流程一样。
                                PutMessageResult putMessageResult =
                                        ScheduleMessageService.this.defaultMessageStore
                                                .putMessage(msgInner);
                                // 如果处理成功,则继续下一条处理
                                if (putMessageResult != null
                                        && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                    continue;
                                } 
                                // 如果处理失败
                                else {
                                    // 打印失败信息
                                    log.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
                                            msgExt.getTopic(), msgExt.getMsgId());
                                    // 则从新创建一个定时任务
                                    ScheduleMessageService.this.timer.schedule(
                                            new DeliverDelayedMessageTimerTask(this.delayLevel,
                                                    nextOffset), DELAY_FOR_A_PERIOD);
                                    //并记录下处理延迟队列的 offset
                                    ScheduleMessageService.this.updateOffset(this.delayLevel,
                                            nextOffset);
                                    return;
                                }
                            } catch (Exception e) {
                                log.error(
                                        "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
                                                + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
                                                + offsetPy + ",sizePy=" + sizePy, e);
                            }
                        }
                    }
                    // 未到投递时间
                    else {
                        // 重新创建一个定时任务,延迟 countdown 长时间在执行
                        ScheduleMessageService.this.timer.schedule(
                                new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                                countdown);
                        // 更新延迟队列待处理消息的 offset
                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                        return;
                    }
                } // end of for

                nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                        this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                return;
            } finally {

                bufferCQ.release();
            }
        } // end of if (bufferCQ != null)
        else {

            long cqMinOffset = cq.getMinOffsetInQueue();
            if (offset < cqMinOffset) {
                failScheduleOffset = cqMinOffset;
                log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
                        + cqMinOffset + ", queueId=" + cq.getQueueId());
            }
        }
    } 
    // 如果出现异常,则创建一个100毫秒延迟的定时任务
    ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
            failScheduleOffset), DELAY_FOR_A_WHILE);
}

这里的注释已经写的很清楚了,就不解释了。

延迟消息 TagCode 值

public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC,
        final boolean readBody) {
        // 省略代码
        ......

       // Timing message processing
        {
            String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
            if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) {
                int delayLevel = Integer.parseInt(t);

                if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
                }
                // 如果是延迟消息队列,则ConsumerQueue中的 tagsCode 存储的是要投递的时间(存储时间+延迟时间)
                if (delayLevel > 0) {
                    tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
                                storeTimestamp);
                }
            }
        //省略代码
        ......
}

从这里看出,如果是延迟消息,则 TagCode 中存储的是消息需要投递到正在消息队列的时间。而不是 Tag 的 hashcode 。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,402评论 6 499
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,377评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,483评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,165评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,176评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,146评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,032评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,896评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,311评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,536评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,696评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,413评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,008评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,815评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,698评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,592评论 2 353

推荐阅读更多精彩内容

  • 用过rocketmq的童靴们肯定知道,它的延迟消息的时间不是任意时间片,而是仅支持18个固定的时间段,默认的配置是...
    划水者阅读 6,188评论 0 0
  • 系列 rocketMq概念介绍 rocketMq-namesrv介绍 rocketMq-Topic创建过程 roc...
    晴天哥_王志阅读 18,890评论 0 7
  • RocketMQ 是什么 Github 上关于 RocketMQ 的介绍:RcoketMQ 是一款低延迟、高可靠、...
    whoami2019阅读 2,247评论 0 1
  • 消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能...
    Sophie12138阅读 722评论 0 7
  • “扪心自问,如果你是别人,你愿意和自己搞对象么?” “想都不敢想,哪有这种福气!” 朋友圈抄来的段子,正版勿怪...
    李乐晨阅读 179评论 0 0