RocketMQ 与 Spring Cloud Stream整合(二、定时消息)

在 RocketMQ 中,提供定时消息的功能。

定时消息,是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。

不过,RocketMQ 暂时不支持任意的时间精度的延迟,而是固化了 18 个延迟级别。如下表格:

延迟级别 时间 延迟级别 时间 延迟级别 时间
1 1s 7 3m 13 9m
2 5s 8 4m 14 10m
3 10s 9 5m 15 20m
4 30s 10 6m 16 30m
5 1m 11 7m 17 1h
6 2m 12 8m 18 2h

如果想要任一时刻的定时消息,可以考虑借助 MySQL + Job 来实现。又或者考虑使用 DDMQ(滴滴打车基于 RocketMQ 和 Kafka 改造的开源消息队列)。

本文代码基于上一篇的RocketMQ快速入门进行增改。继续使用 sca-stream-rocketmq-consumer 项目消费消息,producer项目进行功能增加。

2.1 Demo01Controller

修改 [Demo01Controller]类,增发送定时消息的 HTTP 接口。代码如下:

    @GetMapping("/send_delay")
    public boolean sendDelay() {
        // 创建 Message
        Demo01Message message = new Demo01Message()
                .setId(new Random().nextInt());
        // 创建 Spring Message 对象
        Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
                .setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "3") // <1>  设置延迟级别为 3,10 秒后消费。
                .build();
        // 发送消息
        boolean sendResult = mySource.erbadagangOutput().send(springMessage);
        logger.info("[sendDelay][发送消息完成, 结果 = {}]", sendResult);
        return sendResult;
    }

<1>处,通过添加头MessageConst.PROPERTY_DELAY_TIME_LEVEL,设置消息的延迟级别,从而发送定时消息。

1.2 简单测试

① 执行 ConsumerApplication,启动消费者的实例。
② 执行 ProducerApplication,启动生产者的实例。
之后,请求 http://127.0.0.1:18080/demo01/send_delay 接口,发送延迟 10 秒的定时消息。IDEA 控制台输出日志如下:

// Producer 的控制台,在14:46:15发送了消息。
2020-08-06 14:46:15.855  INFO 1264 --- [io-18080-exec-1] c.e.s.s.r.p.controller.Demo01Controller  : [sendDelay][发送消息完成, 结果 = true]

// Consumer 的控制台,在10秒后的14:46:26消费了消息。
2020-08-06 14:46:26.034  INFO 11212 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer      : [onMessage][线程编号:79 消息内容:Demo01Message{id=-229705646}]

预期。在 Producer 发送的消息之后,Consumer 确实 10 秒后才消费消息。

底线


本文源代码使用 Apache License 2.0开源许可协议,这里是本文源码Gitee地址,可通过命令git clone+地址下载代码到本地,也可直接点击链接通过浏览器方式查看源代码。

本文源代码

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容