[ RocketMQ源码阅读 8 ] 延迟消息实现机制-非TimeWheel实现

RocketMQ采用了后台定时任务,定时轮询延迟消息队列来实现。

先介绍一下延迟队列用到的数据结构,所有的消息都存储在名为SCHEDULE_TOPIC_XXXX的Topic中。每个delayLevel都对应一个单独的queue。这里delayLevel是个关键的概念,也就是延迟消息延迟的粒度。默认为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”

还有个关键的配置文件,位于RocketMQ存储目录的 root/config/delayOffset.json,该文件存储了延迟消息当前发送的进度。

延迟消息发送流程
  1. 客户端发送消息给Broker,Topic为SCHEDULE_TOPIC_XXXX,queueId按照delayLevel进行分配
  2. Broker将消息写入存储
  3. ScheduleMessageService定时读取SCHEDULE_TOPIC_XXXX存储的延迟消息,并判断是否到了该消费的时间
  4. 如果到了消费的时间点,那么通过消息的Properties获取最终投放的Topic和Queue,再次写入存储
  5. Consumer消费到该延迟消息
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容