定时消息:Producer 将消息发送到 MQ 服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到 Consumer 进行消费,该消息即定时消息。
延迟消息:Producer 将消息发送到 MQ 服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到 Consumer 进行消费,该消息即延时消息。
定时消息与延迟消息在代码配置上存在一些差异,但是最终达到的效果相同:消息在发送到 MQ 服务端后并不会立马投递,而是根据消息中的属性延迟固定时间后才投递给消费者。
延时队列的应用
什么是延时队列?顾名思义:首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。
延时队列在项目中的应用还是比较多的:
订单成功后,在30分钟内没有支付,自动取消订单
用户未完成注册,平台发送短信通知。
如果订单一直处于未完结状态,及时关闭订单,并退还库存
如果新建商户一个月内还没上传商品信息,将冻结商铺等
完成测评,或完成交易后的评价等。
预定工作会议,并通知所有参会人员等。
首先,在我们的系统范围内,支持任意延迟的消息指的是:
精度支持到秒级别
最大支持30天的延迟
下文我们讨论几种常用的方案。
一:Quartz定时任务
Quartz一款非常经典任务调度框架,定时任务它有一定的周期性。
引入Quartz框架依赖包:
org.springframework.bootspring-boot-starter-quartz
在启动类中使用@EnableScheduling注解开启定时任务功能。
@EnableScheduling@SpringBootApplicationpublic class DelayqueueApplication {publicstaticvoidmain(String[] args) {SpringApplication.run(DelayqueueApplication.class, args); }}
编写定时任务,每个5秒执行一次。
@ComponentpublicclassQuartzDemo{//每隔五秒@Scheduled(cron ="0/5 * * * * ? ")publicvoidprocess(){System.out.println("执行定时任务:每5秒输出日志==="); }}
cron方案是很常见的一种方案,但有以下几个问题:
(1)轮询效率比较低
(2)每次扫库,已经被执行过记录,仍然会被扫描(只是不会出现在结果集中),有重复计算的嫌疑
(3)时效性不够好,如果每小时轮询一次,最差的情况下,时间误差会达到1小时
(4)如果通过增加cron轮询频率来减少(3)中的时间误差,(1)中轮询低效和(2)中重复计算的问题会进一步凸显
如何利用“延时消息”,对于每个任务只触发一次,保证效率的同时保证实时性,是今天要讨论的问题。
二:JDK 实现
我们在前文也有讨论过此方案。有兴趣的小伙伴可查阅。Redisson 源码分析及实际应用场景介绍 https://www.cnblogs.com/yizhiamumu/p/16706048.html
JDK中提供了一组实现延迟队列的API,位于Java.util.concurrent包下DelayQueue。
DelayQueue是一个BlockingQueue(无界阻塞)队列,它本质就是封装了一个PriorityQueue(优先队列),PriorityQueue内部使用完全二叉堆来实现队列元素排序,我们在向DelayQueue队列中添加元素时,会给元素一个Delay(延迟时间)作为排序条件,队列中最小的元素会优先放在队首。队列中的元素只有到了Delay时间才允许从队列中取出。
队列中可以放基本数据类型或自定义实体类,在存放基本数据类型时,优先队列中元素默认升序排列,自定义实体类就需要我们根据类属性值比较计算了。
先简单实现看看效果。我们添加三个order入队DelayQueue,分别设置订单在当前时间的5秒、10秒、15秒后取消。
要实现DelayQueue延时队列,要实现 Delayed 接口,这接口里只有一个getDelay方法,用于设置延期时间。
Order类中compareTo方法负责对队列中的元素进行排序。
getDelay主要是计算返回剩余时间,单位时间戳(毫秒)延迟任务是否到时就是按照这个方法判断如果返回的是负数则说明到期否则还没到期
compareTo主要是自定义实现比较方法返回 1 0 -1三个参数
publicclassOrderimplementsDelayed{/**
* 延迟时间
*/@JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")privatelongtime; String name;publicOrder(String name,longtime, TimeUnit unit){this.name = name;this.time = System.currentTimeMillis() + (time >0? unit.toMillis(time) :0); }@OverridepubliclonggetDelay(TimeUnit unit){// 计算返回剩余时间。如果返回的是负数则说明到期,否则还没到期returntime - System.currentTimeMillis(); }@OverridepublicintcompareTo(Delayed o){// compareTo主要是自定义实现比较方法返回 1 0 -1三个参数 OrderOrder=(Order) o;longdiff=this.time - Order.time;if(diff <=0) {return-1; }else{return1; } }}
DelayQueue的put方法是线程安全的,因为put方法内部使用了ReentrantLock锁进行线程同步。
DelayQueue还提供了两种出队的方法poll()和take() , poll()为非阻塞获取,没有到期的元素直接返回null;take()阻塞方式获取,没有到期的元素线程将会等待。
publicclassDelayQueueDemo{publicstaticvoidmain(String[] args)throwsInterruptedException {OrderOrder1=newOrder("Order1",5, TimeUnit.SECONDS);OrderOrder2=newOrder("Order2",10, TimeUnit.SECONDS);OrderOrder3=newOrder("Order3",15, TimeUnit.SECONDS); DelayQueue delayQueue =newDelayQueue<>(); delayQueue.put(Order1); delayQueue.put(Order2); delayQueue.put(Order3); System.out.println("======订单延迟队列开始时间:"+ LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));while(delayQueue.size() !=0) {/**
* 取队列头部元素是否过期
*/Ordertask=delayQueue.poll();if(task !=null) { System.out.format("------订单:{%s}被取消, 取消时间:{%s}\n", task.name, LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); } Thread.sleep(1000); } }}
注:实际开发中会有专门的线程,负责消息的入队与消费。
执行后看到结果如下
订单延迟队列开始时间:2022-09-2614:59:09订单:{Order1}被取消, 取消时间:{2022-09-2614:59:14}订单:{Order2}被取消, 取消时间:{2022-09-2614:59:19}订单:{Order3}被取消, 取消时间:{2022-09-2614:59:24}
Order1、Order2、Order3 分别在 5秒、10秒、15秒后被执行,我们用DelayQueue实现了延时队列。
三:Redis sorted set
在前文,我们总结过redis 的数据结构及其应用场景。
参考: 总结篇4:redis 核心数据存储结构及核心业务模型实现应用场景 https://www.cnblogs.com/yizhiamumu/p/16566540.html
参考:redis zset 使用场景 https://www.cnblogs.com/yizhiamumu/p/16736456.html
Redis的数据结构Zset,zset底层的数据结构是skipList,最底层链表有序。
这里主要利用它的score属性,通过score来为集合中的成员进行从小到大的排序。
score作为时间戳,自动按照时间最近的进行排序,启一个线程持续poll并设置park时间,完成延迟队列的设计,可参考Executors.newScheduledThreadPool中的DelayedWorkQueue
通过zadd命令向队列delayqueue中添加元素,并设置score值表示元素过期的时间;向delayqueue添加三个order1、order2、order3,分别是10秒、20秒、30秒后过期。
zadd delayqueue 3 order3
消费端轮询队列delayqueue,将元素排序后取最小时间与当前时间比对,如小于当前时间代表已经过期移除key。
/**
* 消费消息
*/publicvoidpollOrderQueue(){while(true) { Setset= jedis.zrangeWithScores(DELAY_QUEUE,0,0); Stringvalue= ((Tuple)set.toArray()[0]).getElement();intscore = (int) ((Tuple)set.toArray()[0]).getScore(); Calendar cal = Calendar.getInstance();intnowSecond = (int) (cal.getTimeInMillis() /1000);if(nowSecond >= score) { jedis.zrem(DELAY_QUEUE,value); System.out.println(sdf.format(newDate()) +" removed key:"+value); }if(jedis.zcard(DELAY_QUEUE) <=0) { System.out.println(sdf.format(newDate()) +" zset empty ");return; } Thread.sleep(1000); }}
我们看到执行结果符合预期:
2022-09-26 13:24:09 add finished.
2022-09-26 13:24:19 removed key:order1
2022-09-26 13:24:29 removed key:order2
2022-09-26 13:24:39 removed key:order3
2022-09-26 13:24:39 zset empty
这在高并发条件下,多消费者会取到同一个订单号。
解决方案
(1)用分布式锁。
(2)对ZREM的返回值进行判断,只有大于0的时候,才消费数据
Long num = jedis.zrem("OrderId", orderId);if( num != null && num>0){ xxx }
四:Redis 的键空间机制提供过期回调实现
Redis的key过期回调事件,也能达到延迟队列的效果。
该方案使用redis的Keyspace Notifications,中文翻译就是键空间机制,就是利用该机制可以在key失效之后,提供一个回调,实际上是redis会给客户端发送一个消息。是需要redis版本2.8以上。
简单来说我们开启监听key是否过期的事件,一旦key过期会触发一个callback事件。
修改redis.conf文件开启notify-keyspace-events Ex。
notify-keyspace-eventsEx
Redis监听配置,注入Bean RedisMessageListenerContainer。
@ConfigurationpublicclassRedisListenerConfig{@BeanRedisMessageListenerContainercontainer(RedisConnectionFactory connectionFactory){RedisMessageListenerContainercontainer=newRedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory);returncontainer;}}
编写Redis过期回调监听方法,必须继承KeyExpirationEventMessageListener ,有点类似于MQ的消息监听。
@ComponentpublicclassRedisKeyExpirationListenerextendsKeyExpirationEventMessageListener{publicRedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer){super(listenerContainer);}@OverridepublicvoidonMessage(Message message,byte[] pattern){StringexpiredKey=message.toString(); System.out.println("======监听到key:"+ expiredKey +"已过期");}}
到这代码就编写完成,非常的简单,接下来测试一下效果,在redis-cli客户端添加一个key并给定3s的过期时间。
setamumu 123 ex 3
在控制台成功监听到了这个过期的key。
监听到过期的key为:amumu
ps:redis的pub/sub机制存在一个硬伤,官网内容如下
原文:Because Redis Pub/Sub is fire and forget currently there is no way to use this feature if your application demands reliable notification of events, that is, if your Pub/Sub client disconnects, and reconnects later, all the events delivered during the time the client was disconnected are lost.
翻译 : Redis的发布/订阅目前是即发即弃(fire and forget)模式的,因此无法实现事件的可靠通知。也就是说,如果发布/订阅的客户端断链之后又重连,则在客户端断链期间的所有事件都丢失了。方案的可靠性不高,因此,方案不推荐。
优缺点
优点:
(1)由于使用Redis作为消息通道,消息都存储在Redis中。如果发送程序或者任务处理程序挂了,重启之后,还有重新处理数据的可能性。
(2)做集群扩展相当方便
(3)时间准确度高
缺点:需要额外进行redis维护
五:使用MQ 实现延时队列的方案选择
1 RabbitMQ 实现延迟队列
RabbitMQ具有以下两个特性,可以实现延迟队列
RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter
lRabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,用来控制队列内出现了deadletter,则按照这两个参数重新路由。结合以上两个特性,就可以模拟出延迟消息的功能,具体的,我改天再写一篇文章,这里再讲下去,篇幅太长。
实际上RabbitMQ自身并没有直接支持提供延迟队列功能,而是通过 RabbitMQ 消息队列的 TTL和 DXL这两个属性间接实现的。
Time To Live(TTL):指的是消息的存活时间,RabbitMQ可以通过x-message-tt参数来设置指定Queue(队列)和 Message(消息)上消息的存活时间,它的值是一个非负整数,单位为微秒。
Dead Letter Exchanges(DLX):即死信交换机,绑定在死信交换机上的即死信队列。RabbitMQ的Queue(队列)可以配置两个参数x-dead-letter-exchange和x-dead-letter-routing-key(可选),一旦队列内出现了Dead Letter(死信),则按照这两个参数可以将消息重新路由到另一个Exchange(交换机),让消息重新被消费。
优缺点
优点: 高效,可以利用rabbitmq的分布式特性轻易的进行横向扩展,消息支持持久化增加了可靠性。
缺点:本身的易用度要依赖于rabbitMq的运维.因为要引用rabbitMq,所以复杂度和成本变高。
2 RocketMQ 实现延迟队列
RocketMQ延迟队列的核心思路是:所有的延迟消息由producer发出之后,都会存放到同一个topic(SCHEDULE_TOPIC_XXXX)下,不同的延迟级别会对应不同的队列序号,当延迟时间到之后,由定时线程读取转换为普通的消息存的真实指定的topic下,此时对于consumer端此消息才可见,从而被consumer消费。
RocketMQ支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息;
预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h;
本着对自己的高要求,我们并不满足于开源RocketMQ的18个Level的方案。那么,如果我们自己要去实现一个支持任意延迟的消息队列,难点在哪里呢?
未完待续xxx
六:时间轮算法思路
先上一张时间轮的图(
时间轮算法可以类比于时钟,如上图箭头(指针)按某一个方向按固定频率轮动,每一次跳动称为一个 tick。这样可以看出定时轮由个3个重要的属性参数,ticksPerWheel(一轮的tick数),tickDuration(一个tick的持续时间)以及 timeUnit(时间单位),例如当ticksPerWheel=60,tickDuration=1,timeUnit=秒,这就和现实中的始终的秒针走动完全类似了。
如果当前指针指在1上面,我有一个任务需要4秒以后执行,那么这个执行的线程回调或者消息将会被放在5上。那如果需要在20秒之后执行怎么办,由于这个环形结构槽数只到8,如果要20秒,指针需要多转2圈。位置是在2圈之后的5上面(20 % 8 + 1)
ticksPerWheel:槽位数
tick:每个槽位的时间间隔。
假设这个延迟时间为X秒,那么X%(ticksPerWheel * tick)可以计算出X所属的TimeWheel中位置
未完待续xxx