RocketMQ 定时消息

概述

     消息在发送到消息队列RocketMQ版服务端后并不会立马投递,而是根据消息中的属性延迟固定时间后才投递给消费者。比如:常见的场景电商交易中超时未支付关闭订单的场景,通过延时消息在30分钟后投递给消息端进行关单。
     开源 RocketMQ 针对目前只支持固定精度的定时消息。生产端发送消息,通过设delayTimeLevel时间级别后,可实现消息不立马被消费者消费到,而是按照18个级别 ("1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";)。比如支持5秒、10秒的Level,那么用户只能发送5秒延迟或者10秒延迟,不能发送8秒延迟的消息。

实现原理

Message转化

  1. Producer端设置定时级别,发送延时消息后。
  2. Broker端收到延时消息时,会提前将延迟消息的 topic 进行转化为进入 Topic 为 SCHEDULE_TOPIC_XXXX的消息,同时 延迟级别 与 消息队列编号 做固定映射:QueueId = DelayLevel - 1。
 if (msg.getDelayTimeLevel() > 0) {
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }
                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));     msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }

任务检测

  1. 生成 ConsumeQueue 时,每条消息的 tagsCode 使用【消息计划消费时间】。ScheduleMessageService 在轮询 ConsumeQueue 时,可以使用 tagsCode 进行过滤。
  2. 对 SCHEDULE_TOPIC_XXXX 每条消费队列(一共18个) broker端都会启动了一个 timer和 timerTask的任务,默认1s执行一次 拉取数据,拉取过程 首先进行系统topic+delayLevel 查询ConsumeQueue ,然后对比每条消息的延时时间和当前时间对比,发送 到达投递时间【计划消费时间】 的消息。将消息进行转化并按照业务topic封装好Message写入到 commit_log 中。
  DeliverDelayedMessageTimerTask#run
  public void executeOnTimeup() {
            ConsumeQueue cq =
                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));

            long failScheduleOffset = offset;

            if (cq != null) {
                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
                if (bufferCQ != null) {
                    try {
                        long nextOffset = offset;
                        int i = 0;
                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            long offsetPy = bufferCQ.getByteBuffer().getLong();
                            int sizePy = bufferCQ.getByteBuffer().getInt();
                            long tagsCode = bufferCQ.getByteBuffer().getLong();

                            if (cq.isExtAddr(tagsCode)) {
                                if (cq.getExt(tagsCode, cqExtUnit)) {
                                    tagsCode = cqExtUnit.getTagsCode();
                                }  
                            }

                            long now = System.currentTimeMillis();
                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

                            long countdown = deliverTimestamp - now;

                            if (countdown <= 0) {   // 消息到达可发送时间
                                MessageExt msgExt =
                                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                        offsetPy, sizePy);

                                if (msgExt != null) { // 将Message消息转化并将其写入
                                    try {
                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                         
                                        PutMessageResult putMessageResult =
                                            ScheduleMessageService.this.writeMessageStore
                                                .putMessage(msgInner);

                                        if (putMessageResult != null
                                            && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                            continue;
                                        }  
                                    } catch (Exception e) {
                                        
                                    }
                                }
                            } else {  // 安排下一次任务
                                ScheduleMessageService.this.timer.schedule(
                                    new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                                    countdown);
                                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                                return;
                            }
                        } 

                        nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                            this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                        return;
                    } finally {

                        bufferCQ.release();
                    }
                }  
            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                failScheduleOffset), DELAY_FOR_A_WHILE);
        }
  1. 当延时消息写成功后,先更新延时消息消费进度(内存中),同时定时消息发送进度存储在文件(../config/delayOffset.json)里,每 10s 定时持久化发送进度。
  2. 若发送延时消息失败后,MQ会安排下一次任务继续发送直到成功。

优缺点

优点:设计简单,把所有相同延迟时间的消息都先放到一个队列中,定时扫描,可以保证消息消费的有序性

缺点:定时器采用了timer,timer是单线程运行,如果延迟消息数量很大的情况下,可能单线程处理不过来,造成消息到期后也没有发送出去的情况

总结

图片.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容