RocketMQ采用了后台定时任务,定时轮询延迟消息队列来实现。
先介绍一下延迟队列用到的数据结构,所有的消息都存储在名为SCHEDULE_TOPIC_XXXX的Topic中。每个delayLevel都对应一个单独的queue。这里delayLevel是个关键的概念,也就是延迟消息延迟的粒度。默认为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”
还有个关键的配置文件,位于RocketMQ存储目录的 root/config/delayOffset.json,该文件存储了延迟消息当前发送的进度。
延迟消息发送流程
- 客户端发送消息给Broker,Topic为SCHEDULE_TOPIC_XXXX,queueId按照delayLevel进行分配
- Broker将消息写入存储
- ScheduleMessageService定时读取SCHEDULE_TOPIC_XXXX存储的延迟消息,并判断是否到了该消费的时间
- 如果到了消费的时间点,那么通过消息的Properties获取最终投放的Topic和Queue,再次写入存储
- Consumer消费到该延迟消息