在 RocketMQ 中,提供定时消息的功能。
定时消息,是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。
不过,RocketMQ 暂时不支持任意的时间精度的延迟,而是固化了 18 个延迟级别。如下表格:
延迟级别 | 时间 | 延迟级别 | 时间 | 延迟级别 | 时间 |
---|---|---|---|---|---|
1 | 1s | 7 | 3m | 13 | 9m |
2 | 5s | 8 | 4m | 14 | 10m |
3 | 10s | 9 | 5m | 15 | 20m |
4 | 30s | 10 | 6m | 16 | 30m |
5 | 1m | 11 | 7m | 17 | 1h |
6 | 2m | 12 | 8m | 18 | 2h |
如果想要任一时刻的定时消息,可以考虑借助 MySQL + Job 来实现。又或者考虑使用 DDMQ(滴滴打车基于 RocketMQ 和 Kafka 改造的开源消息队列)。
本文代码基于上一篇的RocketMQ快速入门进行增改。继续使用 sca-stream-rocketmq-consumer 项目消费消息,producer项目进行功能增加。
2.1 Demo01Controller
修改 [Demo01Controller]类,增发送定时消息的 HTTP 接口。代码如下:
@GetMapping("/send_delay")
public boolean sendDelay() {
// 创建 Message
Demo01Message message = new Demo01Message()
.setId(new Random().nextInt());
// 创建 Spring Message 对象
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "3") // <1> 设置延迟级别为 3,10 秒后消费。
.build();
// 发送消息
boolean sendResult = mySource.erbadagangOutput().send(springMessage);
logger.info("[sendDelay][发送消息完成, 结果 = {}]", sendResult);
return sendResult;
}
在<1>
处,通过添加头MessageConst.PROPERTY_DELAY_TIME_LEVEL
,设置消息的延迟级别,从而发送定时消息。
1.2 简单测试
① 执行 ConsumerApplication,启动消费者的实例。
② 执行 ProducerApplication,启动生产者的实例。
之后,请求 http://127.0.0.1:18080/demo01/send_delay 接口,发送延迟 10 秒的定时消息。IDEA 控制台输出日志如下:
// Producer 的控制台,在14:46:15发送了消息。
2020-08-06 14:46:15.855 INFO 1264 --- [io-18080-exec-1] c.e.s.s.r.p.controller.Demo01Controller : [sendDelay][发送消息完成, 结果 = true]
// Consumer 的控制台,在10秒后的14:46:26消费了消息。
2020-08-06 14:46:26.034 INFO 11212 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer : [onMessage][线程编号:79 消息内容:Demo01Message{id=-229705646}]
预期。在 Producer 发送的消息之后,Consumer 确实 10 秒后才消费消息。
底线
本文源代码使用 Apache License 2.0开源许可协议,这里是本文源码Gitee地址,可通过命令git clone+地址
下载代码到本地,也可直接点击链接通过浏览器方式查看源代码。
本文源代码