定时消息机制

概览

定时消息是指消息发送到Broker后,并不立即被消费者消费而是要等到特定的时间后才能被消费。

broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。注意,messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:

  • level == 0,消息为非延迟消息
  • 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
  • level > maxLevel,则level== maxLevel,例如level==20,延迟2h

所有延迟消息都使用同一主题SCHEDULE_TOPIC_XXXX,这一主题下一个消息延迟级别,一个消息队列MessageQueue,一个定时任务TimerTask进行处理。如果消息的delayLevel大于0,代表延迟消息,首先将它的原来topic、queueId存入到消息属性中,然后改变消息的topic为SCHEDULE_TOPIC_XXXX、queueId为delayLevel-1这一队列中;然后存入到commitlog中。TimerTask执行定时任务,从延迟队列取出这个消息,根据topic,queueId获取consumequeue,从consumequeue中获取commitlog的message,再将message的REAL_TOPIC、REAL_QID属性进行topic、queueId还原,再次存入到commitlog文件中,等待消费者消费。

加载

延迟消费队列的消费进度定时会存储到 ${ROCKET HOME}rocketmq/store/config/delayOffset.json文件中,所以在项目启动时,需要加载历史消费记录;还要完成消息延迟延迟消息级别和延迟时间的delayLevelTable数据的构造;

delayOffset.json
键值对为queueId:offset
{
    "offsetTable":{12:0,6:0,13:0,5:1,18:0,3:22}
}

// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
        new ConcurrentHashMap<Integer, Long>(32);
public boolean load() {
    boolean result = super.load();
    // delayLevelTable数据的构造
    result = result && this.parseDelayLevel();
    return result;
}

启动定时服务

  1. 为每一个延迟队列创建一个调度任务,每一个调度任务对应SCHEDULE_TOPIC_XXXX主题下的一个消息消费队列;
  2. 启动定时任务,每隔10s持久化延迟消息队列消息进度。
public void start() {
    if (started.compareAndSet(false, true)) {
        this.timer = new Timer("ScheduleMessageTimerThread", true);
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            Long offset = this.offsetTable.get(level);
            if (null == offset) {
                offset = 0L;
            }

            if (timeDelay != null) {
                // 定时任务,每一个定时任务启动时,首先延迟1s;以后每次调度时,首先延迟0.1s,然后再执行定时调度时间
                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }
        // Broker端默认10s持久化一次消息进度,存储文件名:${RocketMQ_ HOME}/store/config/consumerOffset.json
        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    if (started.get()) ScheduleMessageService.this.persist();
                } catch (Throwable e) {
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
    }
}
DeliverDelayedMessageTimerTask

DeliverDelayedMessageTimerTask继承TimerTask是延迟消息定时任务,一个延迟队列一个延时TimeTask任务。TimeTask定时时间机制为延迟1秒执行第一次,以后每0.1秒执行一次DeliverDelayedMessageTimerTask任务;也就是executeOnTimeup()方法,处理一个延迟级别的消息队列,根据延迟级别消息队列的消费offset值,取出在这个延迟级别消息队列offset后面的message,进行延迟时间判断,如果第一个Message的延迟时间不满足,那就继续提交这个DeliverDelayedMessageTimerTask定时任务,其他后面消息也不满足延迟时间,直接return;如果第一个Message到了延迟时间,还原真实的Topic、QueueId将消息再次放入commitlog中,进入真实的topic的队列。如果过程中出现了异常情况,也是重新提交这个DeliverDelayedMessageTimerTask定时任务。

@Override
public void run() {
    try {
        if (isStarted()) {
            // 
            this.executeOnTimeup();
        }
    } catch (Exception e) {
        // XXX: warn and notify me
        log.error("ScheduleMessageService, executeOnTimeup exception", e);
        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
            this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
    }
}

public void executeOnTimeup() {
    // 查找SCHEDULE_TOPIC_XXXX下延迟消息的ConsumeQueue
    ConsumeQueue cq =
        ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
            delayLevel2QueueId(delayLevel));
    // 先初始化失败调度offset为最开始offset,后面再更新
    long failScheduleOffset = offset;

    if (cq != null) {
        // 第一次获取时offset初始值为0;获取这个offset之后的所有message值
        SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
        if (bufferCQ != null) {
            try {
                long nextOffset = offset;
                int i = 0;
                // ConsumeQueueExt 存储ConsumeQueue的扩展属性;CqExtUnit为扩展属性byte字节大小
                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                // 计算出一个message的offset,消息大小,tagHashCode
                for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                    // 消息物理偏移量
                    long offsetPy = bufferCQ.getByteBuffer().getLong();
                    // 消息大小
                    int sizePy = bufferCQ.getByteBuffer().getInt();
                    // tagHashCode
                    long tagsCode = bufferCQ.getByteBuffer().getLong();

                    if (cq.isExtAddr(tagsCode)) {
                        if (cq.getExt(tagsCode, cqExtUnit)) {
                            tagsCode = cqExtUnit.getTagsCode();
                        } else {
                            //can't find ext content.So re compute tags code.
                            log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                                tagsCode, offsetPy, sizePy);
                            long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                            tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                        }
                    }

                    long now = System.currentTimeMillis();
                    // 计算是否到达定期时间
                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
                    // 下一条消息在consumequeue中的offset大小,每个消息在consumequeue中存储大小为20
                    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                    // 计算消息是否到期剩余时间
                    long countdown = deliverTimestamp - now;
                    // 到达消息指定延迟时间
                    if (countdown <= 0) {
                        // 根据物理offset、消息大小从commitlog获取消息
                        MessageExt msgExt =
                            ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                offsetPy, sizePy);
                        // 获取到消息
                        if (msgExt != null) {
                            try {
                                // 还原成为原来的消息,还原topic,queueId
                                MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
                                    log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
                                            msgInner.getTopic(), msgInner);
                                    continue;
                                }
                                // 重新存储消息到commitlog文件中
                                PutMessageResult putMessageResult =
                                    ScheduleMessageService.this.writeMessageStore
                                        .putMessage(msgInner);
                                // 存储成功,继续下一条消息
                                if (putMessageResult != null
                                    && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                    continue;
                                } else {
                                    // XXX: warn and notify me
                                    // 失败记录log
                                    // 再次执行延时任务,更新不同延时级别对应的消费消息offset
                                    log.error(
                                        "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
                                        msgExt.getTopic(), msgExt.getMsgId());
                                    ScheduleMessageService.this.timer.schedule(
                                        new DeliverDelayedMessageTimerTask(this.delayLevel,
                                            nextOffset), DELAY_FOR_A_PERIOD);
                                    ScheduleMessageService.this.updateOffset(this.delayLevel,
                                        nextOffset);
                                    return;
                                }
                            } catch (Exception e) {
                                /*
                                 * XXX: warn and notify me
                                 */
                                log.error(
                                    "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
                                        + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
                                        + offsetPy + ",sizePy=" + sizePy, e);
                            }
                        }
                    } else {
                        // 未到消息指定的延迟时间,继续构建一个延时任务放入定时任务中,更新等待时间
                        ScheduleMessageService.this.timer.schedule(
                            new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                            countdown);
                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                        return;
                    }
                } // end of for

                // 再次创建延时等待任务,等待下次延迟队列中的数据,进行处理。更新延迟队列消费的offset
                nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                    this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                return;
            } finally {
                // 释放获取到的内存消息
                bufferCQ.release();
            }
        } // end of if (bufferCQ != null)
        else {
            //未找到有效的消息,更新延迟队列定时拉取进度
            long cqMinOffset = cq.getMinOffsetInQueue();
            if (offset < cqMinOffset) {
                failScheduleOffset = cqMinOffset;
                log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
                    + cqMinOffset + ", queueId=" + cq.getQueueId());
            }
        }
    } // end of if (cq != null)
    //未从commitlog中获取到延时消息,创建延迟任务,等待commitlog中消息的到来。
    ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
        failScheduleOffset), DELAY_FOR_A_WHILE);
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,402评论 6 499
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,377评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,483评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,165评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,176评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,146评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,032评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,896评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,311评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,536评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,696评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,413评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,008评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,815评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,698评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,592评论 2 353

推荐阅读更多精彩内容