什么是延时队列
延迟队列首先它是一个队列,作为队列它的第一个特征是有序的,而之所以它被称为延时队列它还有一个更重要的特性就是延时。对于普通队列而言,如果有消费者订阅队列消费,则消费者可以立刻从队列中获取到元素。而作为延时队列,消费者订阅了该队列也无法从该队列中获取元素,必须延时结束才能从队列中取出元素。
延时队列的使用场景
延时队列的应用场景很多,举一个我们最常见的例子。例如在一个商场应用中,用户向服务器提交了订单未支付,对于还未超时的订单我们需要提醒用户订单还未支付,对于超时的订单我们需要将他们关闭。
上面的业务场景非常常见,我们可以通过定时扫描订单表找到需要提醒的订单和未支付的订单,然后对它们进行处理。但是这种方式有两个缺点,第一我们需要扫描整个订单库效率上不高,同时数据量上去之后对数据库的压力较大。第二它的时效性不强,因为我们是通过定时任务去发现需要处理的订单,如果定时任务间隔太久就会导致不能及时处理订单。如果定时任务太频繁,对服务器的性能压力太大。总结来说通过定时任务扫表的方式优点就是足够简单,当是它的致命确定就是效率太低。
Java中提供了DelayedQueue,它可以实现延时队列的效果,它解决了上面效率低和延迟的缺点。但是它也不是完美无缺的,使用Java中提供的延迟队列我们需要考虑队列元素数据的持久化。另一个点在于它的数据存在JVM的内存中,如果数据量太大它会导致应用占用的内存过大。
而我们可以通过RabbitMQ来实现延迟队列,它不仅解决了定时扫表方案的效率低和延迟大的缺点,同时还解决了DelayedQueue需要处理数据持久化等繁琐的问题。
RabbitMQ中如何配置延时队列
RabbitMQ中可以使用两种方式来实现延迟队列,其一是通过在Queue上设置TTL来实现,另外一种则是通过在消息中设置expiration来实现。不过仅依靠这两种方式还不能实现延时队列,还需要配合死信队列才行。关于什么是死信队列可以参考《RabbitMQ中的死信队列》该文中的介绍。
设置队列的TTL
在定义队列时,我们可以在队列的定义中加入x-message-ttl参数,当消息被推送到该队列,如果该队列上没有消费者消费,到了x-message-ttl设定的超时时间,消息会从队列中移除。如果队列上有消费者消费,它并不会等到x-message-ttl设定的时间超时才从队列中移除,而是立马被订阅的消费者消费。
所以我在前面说过,我们需要定义一个死信队列,即在队列中增加x-dead-letter-exchange和x-dead-letter-routing-key参数,而我们的消费者不订阅该队列,而是订阅死信队列。这样当消息到了TTL设定的时间时,它会被推送到死信队列,而我们的消费者订阅了死信队列。正是通过这种方式,我们实现了延时队列的效果。
/**
* 正常的业务Exchange和Queue
*/
channel.exchangeDeclare(Config.ORDER_EXCHANGE, BuiltinExchangeType.DIRECT, true);
Map<String, Object> arg = new HashMap<>();
arg.put("x-dead-letter-exchange", Config.DELAYED_EXCHANGE);
arg.put("x-dead-letter-routing-key", Config.DELAYED_ROUTING_KEY);
arg.put("x-message-ttl",10000);
channel.queueDeclare(Config.ORDER_QUEUE, true, false, false, arg);
channel.queueBind(Config.ORDER_QUEUE,Config.ORDER_EXCHANGE,Config.ORDER_ROUTING_KEY);
/**
* 死信Exchange和Queue
*/
channel.exchangeDeclare(Config.DELAYED_EXCHANGE, BuiltinExchangeType.DIRECT,true);
channel.queueDeclare(Config.DELAYED_QUEUE,true,false,false,new HashMap<>());
channel.queueBind(Config.DELAYED_QUEUE,Config.DELAYED_EXCHANGE,Config.DELAYED_ROUTING_KEY);
上面的代码用图表示如下:
客户端将消息推送到A部分的交换机(order.exchange),然后通过路由键到队列(order.queue)。队列中的消息到了超时时间,则从A中的队列出队,进入死信队列。最后我们的消费者订阅B部分的队列(delayed.queue),从B队列部分获取消息。这就是整个消息延时队列的基本原理。
向A中的交换机发送消息
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String content = sdf.format(new Date());
log.info("生产者发送消息:{}", content);
byte[] msg = content.getBytes(StandardCharsets.UTF_8);
channel.basicPublish(Config.ORDER_EXCHANGE, Config.ORDER_ROUTING_KEY, new AMQP.BasicProperties(), msg);
消费者订阅B部分的队列
channel.basicConsume(Config.DELAYED_QUEUE, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String content = sdf.format(new Date());
String msg = new String(body, StandardCharsets.UTF_8);
log.info("消费者收到消息:{},当前时间:{}", msg, content);
}
});
测试生产者打印日志如下:
生产者发送消息:2021-05-12 10:01:31
消费者打印日志如下:
消费者收到消息:2021-05-12 10:01:31,当前时间:2021-05-12 10:01:41
通过在队列中设置x-message-ttl配合死信队列可以实现延时队列的功能,但是它存在一个缺陷。我们的超时时间是设置在队列上的,如果我现在有多个不同时长的延时的需求,使用这种方式实现起来就比较麻烦。对于不同时长的延时,我需要设置对应的队列,如果每个消息都有自己的延时时间,总不能每个消息给他创建一个队列吧。
设置消息的expiration
上面我们说了在队列中设置TTL的缺陷,而RabbitMQ对此作了进一步的优化。我们不仅可以将超时时间设置在队列中,还可以设置在消息上,只需要在消息的properties中添加expiration即可。如果即在队列中设置了x-message-ttl,又在消息上设置了expiration,应用时都是以最小值为准的。
还是之前的代码,修改生产者相关代码,在消息的properties中添加expiration为5000,代码如下:
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String content = sdf.format(new Date());
log.info("生产者发送消息:{}", content);
byte[] msg = content.getBytes(StandardCharsets.UTF_8);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration("5000").build();
channel.basicPublish(Config.ORDER_EXCHANGE, Config.ORDER_ROUTING_KEY, properties, msg);
运行程序,生产者打印日志如下:
生产者发送消息:2021-05-12 10:30:40
消费者打印日志如下:
消费者收到消息:2021-05-12 10:30:40,当前时间:2021-05-12 10:30:45
但是这种方式也存在缺陷,如果我向队列中依次推入两条消息,第一条消息的expiration为5000,第二条消息的expiration为3000,代码如下:
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
for (Integer delayedTime : delayedTimes) {
String content = String.format("消息时间:[%s],延时[%d]s", sdf.format(new Date()), delayedTime);
log.info(content);
byte[] msg = content.getBytes(StandardCharsets.UTF_8);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration(String.valueOf(delayedTime * 1000)).build();
channel.basicPublish(Config.ORDER_EXCHANGE, Config.ORDER_ROUTING_KEY, properties, msg);
}
运行程序,生产者打印日志内容如下:
消息时间:[2021-05-12 10:46:17],延时[5]s
消息时间:[2021-05-12 10:46:17],延时[3]s
消费者打印日志如下:
消费者收到消息:消息时间:[2021-05-12 10:46:17],延时[5]s,当前时间:2021-05-12 10:46:22
消费者收到消息:消息时间:[2021-05-12 10:46:17],延时[3]s,当前时间:2021-05-12 10:46:22
从打印的日志可以看出,第二条消息不是延时3秒后才会消费,而是延时了5秒。这是因为RabbitMQ只会检查第一个消息是否过期,如果过期了就会将数据丢到死信队列,而第二个消息即使已经过期了,RabbitMQ也不会将它丢到死信队列。这就导致了第二个消息我们设置的3秒最后变成了5秒。
rabbitmq-delayed-message-exchange插件
通过上面我们了解了可以通过设置队列或者消息的超时时间来实现延时队列的功能,但是他们或多或少都有部分缺陷。对于在队列上设置超时时间来说,如果延时的时间不固定处理起来比较麻烦。对于在消息中设置超时时间的方式来说,它会受到队列中第一个消息的影响,导致消息已经超时但是还在队列中无法出队。
还好在RabbitMQ中提供了一个rabbitmq-delayed-message-exchange插件,通过这个插件就能解决上面的问题。
插件安装
该插件不是RabbitMQ本身就有的插件,我们需要先去下载该插件,然后将其放入到RabbitMQ安装目录下的plugins目录中,然后执行下面命令:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
如果一切正常的话会显示安装成功,然后重启RabbitMQ服务即可。
如何使用
如何使用也比较简单,首先我们定义一个Exchange,这个Exchange的类型为x-delayed-message,这个类型就是插件提供的一个类型。这个类型不是RabbitMQ默认的那几种类型之一,而原先的Exchange类型变成在arguments中设置x-delayed-type。相关代码如下所示:
Map<String,Object> argMap = new HashMap<>();
argMap.put("x-delayed-type","direct");
channel.exchangeDeclare(Config.PLUGINS_EXCHANGE,"x-delayed-message",true,false,argMap);
channel.queueDeclare(Config.PLUGINS_QUEUE,true,false,false,new HashMap<>());
channel.queueBind(Config.PLUGINS_QUEUE,Config.PLUGINS_EXCHANGE,Config.PLUGINS_ROUTING_KEY);
这里面定义队列和绑定与正常使用没有差别。想要实现延时效果,我们只需要在发送消息时在headers中添加x-delay即可,如果不需要实现延时效果不设置即可。
List<Integer> delayedTimes = Arrays.asList(5, 2, 3, 4, 1);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
for (Integer delayedTime : delayedTimes) {
String content = String.format("消息时间:[%s],延时[%d]s", sdf.format(new Date()), delayedTime);
log.info(content);
byte[] msg = content.getBytes(StandardCharsets.UTF_8);
Map<String,Object> headers = new HashMap<>();
headers.put("x-delay",delayedTime * 1000);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().headers(headers).build();
channel.basicPublish(Config.PLUGINS_EXCHANGE, Config.PLUGINS_ROUTING_KEY, properties, msg);
}
上面示例代码中,生产者向Exchange推送5条消息,且每条消息的延时时间是不同的,最后我们添加消费者代码如下:
channel.basicConsume(Config.PLUGINS_QUEUE, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String content = sdf.format(new Date());
String msg = new String(body, StandardCharsets.UTF_8);
log.info("消费者收到消息:{},当前时间:{}", msg, content);
}
});
对于消费者代码来说,与正常的方式没有区别。最后生产者和消费者打印的日志如下:
消息时间:[2021-05-12 13:43:56],延时[5]s
消息时间:[2021-05-12 13:43:56],延时[2]s
消息时间:[2021-05-12 13:43:56],延时[3]s
消息时间:[2021-05-12 13:43:56],延时[4]s
消息时间:[2021-05-12 13:43:56],延时[1]s
消费者打印的日志如下:
消费者收到消息:消息时间:[2021-05-12 13:43:56],延时[1]s,当前时间:2021-05-12 13:43:58
消费者收到消息:消息时间:[2021-05-12 13:43:56],延时[2]s,当前时间:2021-05-12 13:43:58
消费者收到消息:消息时间:[2021-05-12 13:43:56],延时[3]s,当前时间:2021-05-12 13:44:00
消费者收到消息:消息时间:[2021-05-12 13:43:56],延时[4]s,当前时间:2021-05-12 13:44:01
消费者收到消息:消息时间:[2021-05-12 13:43:56],延时[5]s,当前时间:2021-05-12 13:44:01
从打印结果可以看出,生产者生产的第一条消息需要延时5秒再从队列中出队,但是它并没有影响后续时间短的出队,实现了我们想要的效果。
机制
其实该插件的机制比较好理解,也不需要借助死信队列来完成,相对于设置队列超时和消息超时的方式来说更加简单。当Exchange接收到消息后并未立即将消息投递至目标队列中,而是存储在mnesia(一个分布式数据系统)表中,检测消息延迟时间。如达到可投递时间时并将其通过x-delayed-type类型标记的交换机类型投递至目标队列。
总结
简单来说使用RabbitMQ来实现延时队列我们可以在不使用插件的情况下通过TTL+死信队列来实现,不过这种实现有部分局限性。如果有必要,我们可以通过安装插件的机制来解决这部分局限性。在实际应用中,我们可以根据自己的需求来决定使用哪种方案。
相关资源
文中示例代码:https://gitee.com/zengchao_workspace/rabbit-mq-demo
官网关于TTL的介绍:Time-To-Live and Expiration
rabbitmq-delayed-message-exchange插件地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange