RocketMQ源码分析----延时消息和重试消息的实现

延时消息的例子

和普通消息发送没多大区别,只是多了个句设置延时级别的代码

    ....
    Message msgToBroker = new Message("topic_test", tag, id, body);
    msgToBroker.setDelayTimeLevel(3);
    ....

level为3那么代表10s后消费者10s后能拉到这条消息,消费者端的代码也和普通的消费一样,其实逻辑都在Broker端,接下来看下broker的处理逻辑

存储延时消息

了解普通消息存储原理的应该知道CommitLog的putMessage方法是处理存储的逻辑,延时消息的存储也是这里实现的,看下putMessage里有段代码如下:

        if (tranType == MessageSysFlag.TransactionNotType//
                || tranType == MessageSysFlag.TransactionCommitType) {
            // 如果设置了延时级别
            if (msg.getDelayTimeLevel() > 0) {
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }
                // 延时消息的topic为SCHEDULE_TOPIC_XXXX
                topic = ScheduleMessageService.SCHEDULE_TOPIC;
                // qid为延时级别-1
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // 将原有的topic和qid放到properties属性中
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
                // 更新延时消息的topic和qid
                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

一开始我也没想到延时消息是怎么实现了,但是看到这,一下就恍然大悟了。我们发送了一个延时消息,其实这条消息在Broker里就被改变了topic和qid,消费者消费不到,是因为topic改变了啊,肯定消费不到啊。那么问题又来了:

  • 这时候消费不到是正常的,因为没有订阅这个topic,那么为什么时间到了之后就消费到了呢?

延时消息处理

核心逻辑都在这个ScheduleMessageService里,看了ScheduleMessageService的代码,就知道上面那个问题的答案了。
说句题外话,我是怎么找到ScheduleMessageService的呢?其实读其他源码的时候也是类似的,像这种异步的操作,是怎么找到实现的地方的?个人经验,一般有几个方法吧

  1. 通过某些关键字去搜索对应的类,例如这里这个SCHEDULE_TOPIC和ScheduleMessageService.delayLevel2QueueId,通过这两个,我找到了ScheduleMessageService,发现里面是延时消息的处理逻辑
  2. 先百度一下延时消息的原理分析,那么会发现分析ScheduleMessageService的文章,那么不看文章,自己代码去找ScheduleMessageService这类,自己debug

看下start方法

    public void start() {
        // 为每个延时队列增加定时器
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            Long offset = this.offsetTable.get(level);
            if (null == offset) {
                offset = 0L;
            }
            //16个延时级别,每个级别都有个定时器去扫描消息
            if (timeDelay != null) {
                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }
        // 定时将延时进度刷盘
        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    ScheduleMessageService.this.persist();
                } catch (Exception e) {
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
    }

offsetTable保存了每个延时级别的已经处理到哪个位置
DeliverDelayedMessageTimerTask的run方法调用了executeOnTimeup方法

    public void executeOnTimeup() {
        // 通过topic+qid获取cq,代表了逻辑
        ConsumeQueue cq =
                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
                        delayLevel2QueueId(delayLevel));
        long failScheduleOffset = offset;

        if (cq != null) {
            // 取出起始offset的对应的CQ的一个bytebuffer封装
            SelectMapedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
            if (bufferCQ != null) {
                try {
                    long nextOffset = offset;
                    int i = 0;
                    for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQStoreUnitSize) {
                        // cq的3个组成
                        long offsetPy = bufferCQ.getByteBuffer().getLong();
                        int sizePy = bufferCQ.getByteBuffer().getInt();
                        long tagsCode = bufferCQ.getByteBuffer().getLong();

                        long now = System.currentTimeMillis();
                        // 计算延时到期的时间
                        long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

                        nextOffset = offset + (i / ConsumeQueue.CQStoreUnitSize);
    
                        long countdown = deliverTimestamp - now;
                        // <=0代表已经到期了,可以做对应操作了  
                        if (countdown <= 0) {
                            // 因为cq可以看成是一个索引,取到cq,通过offsetPy和sizePy取获取真实的消息
                            MessageExt msgExt =
                                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                            offsetPy, sizePy);
                            // 取到了消息
                            if (msgExt != null) {
                                try {
                                    // msgExt这个消息是topic为SCHEDULE_TOPIC的消息,将其转换成真正的消息
                                    MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                    PutMessageResult putMessageResult =
                                            ScheduleMessageService.this.defaultMessageStore
                                                    .putMessage(msgInner);
                                    if (putMessageResult != null
                                            && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                        continue;
                                    } else {
                                        // ....
                                        return;
                                    }
                                } catch (Exception e) {
                                    // ....error log
                                }
                            }
                        } else {
                            // ....
                            return;
                        }
                    } // end of for

                    nextOffset = offset + (i / ConsumeQueue.CQStoreUnitSize);
                    ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                            this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                    ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                    return;
                } finally {

                    bufferCQ.release();
                }
            } // end of if (bufferCQ != null)
            else {
                //....
            }
        } // end of if (cq != null)

        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                failScheduleOffset), DELAY_FOR_A_WHILE);
    }

代码有点长,但是核心代码在countdown <= 0的这个if里,其他是异常处理流程,后面再说,总结一下主要的步骤

  1. 从上次处理完毕的offset中继续处理,获取对应的cq
  2. 通过tagsCode计算消息是否到达可消费的时候
  3. 通过cq的sizePy和offsetBy获取Message
  4. 将消息的topic和qid换成真实的消息topic和qid
  5. 写入commitLog
  6. 更新offsetTable

上面有几点需要说明一下:
第二点:还记得在构建CQ的时候吗,当时判断了如果是延时消息,那么将tagsCode换成时间戳,所以这里就可以直接根据tagsCode进行时间的时间了。本来tagsCode是tag的hashCode,普通消息用来过滤使用的,但是对于延时消息,他并不需要去比对tag,他需要的是比对时间是否到达指定时间,那么tagsCode就换成时间戳,方便过滤
第四点:看到这里整个延时消息就原理就是发送的时候替换topic写入commitlog,到达时间了,才换成真正的消息写入commitlog,那么消费者只有在时间到达,即真正消息写入的时候才能消费
第六点:当操作成功之后,会更新nextOffset,防止下次继续计算

延时消息异常处理

executeOnTimeup方法里,会看到很多else,或者有catch,这种属于异常情况或者消息还未到达时间的情况,我们一一来分析一下

时间未到

if (countdown <= 0) {
// ....
}else {
    ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),countdown);
    ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
    return;
}

如果时间还未到达,那么countdown就是剩余的时间,这个时候,会再启动一个定时任务,在countdown时间后执行一遍

写入CommitLog失败

if (putMessageResult != null
                                                && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
    // ....
} else {
    ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,nextOffset), DELAY_FOR_A_PERIOD);
    ScheduleMessageService.this.updateOffset(this.delayLevel,nextOffset);
    return;
}

和上面那种情况类似,不同的只是时间变成10s。出现这种情况的,可能是broker消息处理不过来了,出现pagecache繁忙,所以应该延长一点再去处理

根据offset取不到对应CQ文件对应的ByteBuffer

if (bufferCQ != null) {
} // end of if (bufferCQ != null)
else {
    /*
     * 索引文件被删除,定时任务中记录的offset已经被删除,会导致从该位置中取不到数据,
     * 这里直接纠正下一次定时任务的offset为当前定时任务队列的最小值
     */
    long cqMinOffset = cq.getMinOffsetInQuque();
    if (offset < cqMinOffset) {
        failScheduleOffset = cqMinOffset;
        log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
                + cqMinOffset + ", queueId=" + cq.getQueueId());
    }
}
if (cq != null) {
} // end of if (cq != null)
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
            failScheduleOffset), DELAY_FOR_A_WHILE);

这种情况根据offset去查找CQ,如果文件被删除了,会存在当前的CQminOffset大于当前传入的offset,则会返回null,那么纠正offset,下次从当前cq中最小的offset进行处理
<font color="red">目前还不知道哪种情况会文件被删除,而且这种情况且不是丢失了延时消息?</font>

消息重试

回顾一下消费者消费的时候,如果失败了会把消息发回Broker,Broker会将该消息topic设置为%RETRY%+group,而消费者在启动的时候,默认会订阅这个topic,那么就是说,消息重试则发送到重试的topic中,这样,消费者会去拉这个topic的消息,这样就完成了消息的重试,那么问题又来了

  • 如果仅仅是这样,那么重试的间隔是怎么回事?

细心观察的会发现,消息重试间隔是延时消息一毛一样,可以猜到可能就是延时消息的实现。

我们看下SendMessageProcessor处理消息重试的地方,请求Code为CONSUMER_SEND_MSG_BACK,broker执行的方法为consumerSendMsgBack,其中有几行代码如下:

        // ....
        String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
        int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
        // ....
        if (msgExt.getReconsumeTimes() >= maxReconsumeTimes//
                || delayLevel < 0) {
            // ....
        } else {
            if (0 == delayLevel) {
                delayLevel = 3 + msgExt.getReconsumeTimes();
            }

            msgExt.setDelayTimeLevel(delayLevel);
        }

很熟悉吧,通过重试次数设置延迟级别,几时消费者订阅了重试topic,也是不能拉到消息的,因为消息被发送到了延时队列里去了

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,245评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,749评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,960评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,575评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,668评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,670评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,664评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,422评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,864评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,178评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,340评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,015评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,646评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,265评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,494评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,261评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,206评论 2 352

推荐阅读更多精彩内容