RabbitMQ实现延时队列

什么是延时队列

延迟队列首先它是一个队列,作为队列它的第一个特征是有序的,而之所以它被称为延时队列它还有一个更重要的特性就是延时。对于普通队列而言,如果有消费者订阅队列消费,则消费者可以立刻从队列中获取到元素。而作为延时队列,消费者订阅了该队列也无法从该队列中获取元素,必须延时结束才能从队列中取出元素。

延时队列的使用场景

延时队列的应用场景很多,举一个我们最常见的例子。例如在一个商场应用中,用户向服务器提交了订单未支付,对于还未超时的订单我们需要提醒用户订单还未支付,对于超时的订单我们需要将他们关闭。

上面的业务场景非常常见,我们可以通过定时扫描订单表找到需要提醒的订单和未支付的订单,然后对它们进行处理。但是这种方式有两个缺点,第一我们需要扫描整个订单库效率上不高,同时数据量上去之后对数据库的压力较大。第二它的时效性不强,因为我们是通过定时任务去发现需要处理的订单,如果定时任务间隔太久就会导致不能及时处理订单。如果定时任务太频繁,对服务器的性能压力太大。总结来说通过定时任务扫表的方式优点就是足够简单,当是它的致命确定就是效率太低。

Java中提供了DelayedQueue,它可以实现延时队列的效果,它解决了上面效率低和延迟的缺点。但是它也不是完美无缺的,使用Java中提供的延迟队列我们需要考虑队列元素数据的持久化。另一个点在于它的数据存在JVM的内存中,如果数据量太大它会导致应用占用的内存过大。

而我们可以通过RabbitMQ来实现延迟队列,它不仅解决了定时扫表方案的效率低和延迟大的缺点,同时还解决了DelayedQueue需要处理数据持久化等繁琐的问题。

RabbitMQ中如何配置延时队列

RabbitMQ中可以使用两种方式来实现延迟队列,其一是通过在Queue上设置TTL来实现,另外一种则是通过在消息中设置expiration来实现。不过仅依靠这两种方式还不能实现延时队列,还需要配合死信队列才行。关于什么是死信队列可以参考《RabbitMQ中的死信队列》该文中的介绍。

设置队列的TTL

在定义队列时,我们可以在队列的定义中加入x-message-ttl参数,当消息被推送到该队列,如果该队列上没有消费者消费,到了x-message-ttl设定的超时时间,消息会从队列中移除。如果队列上有消费者消费,它并不会等到x-message-ttl设定的时间超时才从队列中移除,而是立马被订阅的消费者消费。

所以我在前面说过,我们需要定义一个死信队列,即在队列中增加x-dead-letter-exchangex-dead-letter-routing-key参数,而我们的消费者不订阅该队列,而是订阅死信队列。这样当消息到了TTL设定的时间时,它会被推送到死信队列,而我们的消费者订阅了死信队列。正是通过这种方式,我们实现了延时队列的效果。

/**
 * 正常的业务Exchange和Queue
 */
channel.exchangeDeclare(Config.ORDER_EXCHANGE, BuiltinExchangeType.DIRECT, true);
Map<String, Object> arg = new HashMap<>();
arg.put("x-dead-letter-exchange", Config.DELAYED_EXCHANGE);
arg.put("x-dead-letter-routing-key", Config.DELAYED_ROUTING_KEY);
arg.put("x-message-ttl",10000);
channel.queueDeclare(Config.ORDER_QUEUE, true, false, false, arg);
channel.queueBind(Config.ORDER_QUEUE,Config.ORDER_EXCHANGE,Config.ORDER_ROUTING_KEY);
/**
 * 死信Exchange和Queue
 */
channel.exchangeDeclare(Config.DELAYED_EXCHANGE, BuiltinExchangeType.DIRECT,true);
channel.queueDeclare(Config.DELAYED_QUEUE,true,false,false,new HashMap<>());
channel.queueBind(Config.DELAYED_QUEUE,Config.DELAYED_EXCHANGE,Config.DELAYED_ROUTING_KEY);

上面的代码用图表示如下:


延迟队列TTL.jpg

客户端将消息推送到A部分的交换机(order.exchange),然后通过路由键到队列(order.queue)。队列中的消息到了超时时间,则从A中的队列出队,进入死信队列。最后我们的消费者订阅B部分的队列(delayed.queue),从B队列部分获取消息。这就是整个消息延时队列的基本原理。

向A中的交换机发送消息

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String content = sdf.format(new Date());
log.info("生产者发送消息:{}", content);
byte[] msg = content.getBytes(StandardCharsets.UTF_8);
channel.basicPublish(Config.ORDER_EXCHANGE, Config.ORDER_ROUTING_KEY, new AMQP.BasicProperties(), msg);

消费者订阅B部分的队列

channel.basicConsume(Config.DELAYED_QUEUE, true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String content = sdf.format(new Date());
        String msg = new String(body, StandardCharsets.UTF_8);
        log.info("消费者收到消息:{},当前时间:{}", msg, content);
    }
});

测试生产者打印日志如下:

生产者发送消息:2021-05-12 10:01:31

消费者打印日志如下:

消费者收到消息:2021-05-12 10:01:31,当前时间:2021-05-12 10:01:41

通过在队列中设置x-message-ttl配合死信队列可以实现延时队列的功能,但是它存在一个缺陷。我们的超时时间是设置在队列上的,如果我现在有多个不同时长的延时的需求,使用这种方式实现起来就比较麻烦。对于不同时长的延时,我需要设置对应的队列,如果每个消息都有自己的延时时间,总不能每个消息给他创建一个队列吧。

设置消息的expiration

上面我们说了在队列中设置TTL的缺陷,而RabbitMQ对此作了进一步的优化。我们不仅可以将超时时间设置在队列中,还可以设置在消息上,只需要在消息的properties中添加expiration即可。如果即在队列中设置了x-message-ttl,又在消息上设置了expiration,应用时都是以最小值为准的。

还是之前的代码,修改生产者相关代码,在消息的properties中添加expiration为5000,代码如下:

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String content = sdf.format(new Date());
log.info("生产者发送消息:{}", content);
byte[] msg = content.getBytes(StandardCharsets.UTF_8);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration("5000").build();
channel.basicPublish(Config.ORDER_EXCHANGE, Config.ORDER_ROUTING_KEY, properties, msg);

运行程序,生产者打印日志如下:

生产者发送消息:2021-05-12 10:30:40

消费者打印日志如下:

消费者收到消息:2021-05-12 10:30:40,当前时间:2021-05-12 10:30:45

但是这种方式也存在缺陷,如果我向队列中依次推入两条消息,第一条消息的expiration为5000,第二条消息的expiration为3000,代码如下:

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
for (Integer delayedTime : delayedTimes) {
    String content = String.format("消息时间:[%s],延时[%d]s", sdf.format(new Date()), delayedTime);
    log.info(content);
    byte[] msg = content.getBytes(StandardCharsets.UTF_8);
    AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration(String.valueOf(delayedTime * 1000)).build();
    channel.basicPublish(Config.ORDER_EXCHANGE, Config.ORDER_ROUTING_KEY, properties, msg);
}

运行程序,生产者打印日志内容如下:

消息时间:[2021-05-12 10:46:17],延时[5]s
消息时间:[2021-05-12 10:46:17],延时[3]s

消费者打印日志如下:

消费者收到消息:消息时间:[2021-05-12 10:46:17],延时[5]s,当前时间:2021-05-12 10:46:22
消费者收到消息:消息时间:[2021-05-12 10:46:17],延时[3]s,当前时间:2021-05-12 10:46:22

从打印的日志可以看出,第二条消息不是延时3秒后才会消费,而是延时了5秒。这是因为RabbitMQ只会检查第一个消息是否过期,如果过期了就会将数据丢到死信队列,而第二个消息即使已经过期了,RabbitMQ也不会将它丢到死信队列。这就导致了第二个消息我们设置的3秒最后变成了5秒。

rabbitmq-delayed-message-exchange插件

通过上面我们了解了可以通过设置队列或者消息的超时时间来实现延时队列的功能,但是他们或多或少都有部分缺陷。对于在队列上设置超时时间来说,如果延时的时间不固定处理起来比较麻烦。对于在消息中设置超时时间的方式来说,它会受到队列中第一个消息的影响,导致消息已经超时但是还在队列中无法出队。

还好在RabbitMQ中提供了一个rabbitmq-delayed-message-exchange插件,通过这个插件就能解决上面的问题。

插件安装

该插件不是RabbitMQ本身就有的插件,我们需要先去下载该插件,然后将其放入到RabbitMQ安装目录下的plugins目录中,然后执行下面命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

如果一切正常的话会显示安装成功,然后重启RabbitMQ服务即可。

如何使用

如何使用也比较简单,首先我们定义一个Exchange,这个Exchange的类型为x-delayed-message,这个类型就是插件提供的一个类型。这个类型不是RabbitMQ默认的那几种类型之一,而原先的Exchange类型变成在arguments中设置x-delayed-type。相关代码如下所示:

Map<String,Object> argMap = new HashMap<>();
argMap.put("x-delayed-type","direct");
channel.exchangeDeclare(Config.PLUGINS_EXCHANGE,"x-delayed-message",true,false,argMap);
channel.queueDeclare(Config.PLUGINS_QUEUE,true,false,false,new HashMap<>());
channel.queueBind(Config.PLUGINS_QUEUE,Config.PLUGINS_EXCHANGE,Config.PLUGINS_ROUTING_KEY);

这里面定义队列和绑定与正常使用没有差别。想要实现延时效果,我们只需要在发送消息时在headers中添加x-delay即可,如果不需要实现延时效果不设置即可。

List<Integer> delayedTimes = Arrays.asList(5, 2, 3, 4, 1);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
for (Integer delayedTime : delayedTimes) {
    String content = String.format("消息时间:[%s],延时[%d]s", sdf.format(new Date()), delayedTime);
    log.info(content);
    byte[] msg = content.getBytes(StandardCharsets.UTF_8);
    Map<String,Object> headers = new HashMap<>();
    headers.put("x-delay",delayedTime * 1000);
    AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().headers(headers).build();
    channel.basicPublish(Config.PLUGINS_EXCHANGE, Config.PLUGINS_ROUTING_KEY, properties, msg);
}

上面示例代码中,生产者向Exchange推送5条消息,且每条消息的延时时间是不同的,最后我们添加消费者代码如下:

channel.basicConsume(Config.PLUGINS_QUEUE, true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String content = sdf.format(new Date());
        String msg = new String(body, StandardCharsets.UTF_8);
        log.info("消费者收到消息:{},当前时间:{}", msg, content);
    }
});

对于消费者代码来说,与正常的方式没有区别。最后生产者和消费者打印的日志如下:

消息时间:[2021-05-12 13:43:56],延时[5]s
消息时间:[2021-05-12 13:43:56],延时[2]s
消息时间:[2021-05-12 13:43:56],延时[3]s
消息时间:[2021-05-12 13:43:56],延时[4]s
消息时间:[2021-05-12 13:43:56],延时[1]s

消费者打印的日志如下:

消费者收到消息:消息时间:[2021-05-12 13:43:56],延时[1]s,当前时间:2021-05-12 13:43:58
消费者收到消息:消息时间:[2021-05-12 13:43:56],延时[2]s,当前时间:2021-05-12 13:43:58
消费者收到消息:消息时间:[2021-05-12 13:43:56],延时[3]s,当前时间:2021-05-12 13:44:00
消费者收到消息:消息时间:[2021-05-12 13:43:56],延时[4]s,当前时间:2021-05-12 13:44:01
消费者收到消息:消息时间:[2021-05-12 13:43:56],延时[5]s,当前时间:2021-05-12 13:44:01

从打印结果可以看出,生产者生产的第一条消息需要延时5秒再从队列中出队,但是它并没有影响后续时间短的出队,实现了我们想要的效果。

机制

其实该插件的机制比较好理解,也不需要借助死信队列来完成,相对于设置队列超时和消息超时的方式来说更加简单。当Exchange接收到消息后并未立即将消息投递至目标队列中,而是存储在mnesia(一个分布式数据系统)表中,检测消息延迟时间。如达到可投递时间时并将其通过x-delayed-type类型标记的交换机类型投递至目标队列。

总结

简单来说使用RabbitMQ来实现延时队列我们可以在不使用插件的情况下通过TTL+死信队列来实现,不过这种实现有部分局限性。如果有必要,我们可以通过安装插件的机制来解决这部分局限性。在实际应用中,我们可以根据自己的需求来决定使用哪种方案。

相关资源

文中示例代码:https://gitee.com/zengchao_workspace/rabbit-mq-demo

官网关于TTL的介绍:Time-To-Live and Expiration

rabbitmq-delayed-message-exchange插件地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,390评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,821评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,632评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,170评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,033评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,098评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,511评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,204评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,479评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,572评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,341评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,893评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,171评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,486评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,676评论 2 335

推荐阅读更多精彩内容

  • 本人所在的公司在生产中使用 RabbitMQ 作为消息中间件,并在业务中用到了 RabbitMQ 的延时队列功能。...
    神秘的崔老师阅读 767评论 0 3
  • 首先,我们需要知道,Rabbit本身是不支持延时队列的。但是,它有一个死信投递机制,可以曲线救国来实现我们想要的延...
    丶Lukez阅读 1,388评论 0 2
  • 虽然 rabbitmq 没有延时队列的功能,但是稍微变动一下也是可以实现的 实现延时队列的基本要素 存在一个倒计时...
    _老七阅读 661评论 0 1
  • 我是黑夜里大雨纷飞的人啊 1 “又到一年六月,有人笑有人哭,有人欢乐有人忧愁,有人惊喜有人失落,有的觉得收获满满有...
    陌忘宇阅读 8,503评论 28 53
  • 信任包括信任自己和信任他人 很多时候,很多事情,失败、遗憾、错过,源于不自信,不信任他人 觉得自己做不成,别人做不...
    吴氵晃阅读 6,161评论 4 8