延迟任务是啥
我们把需要延迟执行的任务叫做延迟任务。也就是说当发生某个事件之后或者之前的某个特定的时间点执行的一系列动作。
延迟任务的使用场景有以下这些:
- 红包 24 小时未被查收,需要延迟执行退还业务;
- 预定会议之后,在会议开始之前的10分钟发送信息通知参会人员;
- 订单下单之后 30 分钟后,用户如果没有付钱,系统需要自动取消订单。
延迟任务的特点有以下这些:
- 数据量大
- 时效性高
- 满足最少被消费一次语义
1、基于redis的zset实现
Redis实现延时任务,是通过其数据结构ZSET来实现的。ZSET会储存一个score和一个value,可以将value按照score进行排序。
延时任务的实现分为以下几步来实现:
(1) 将任务的执行时间作为score,要执行的任务数据作为value,jobId+topicName+groupId+delayTime作为key,通过zadd命令将数据存放在zset中;
(2) 用一个进程定时查询zset的score分数最小的元素,可以用ZRANGEBYSCORE key -inf +inf limit 0 1 withscores命令来实现;
(3) 如果最小的分数小于等于当前时间戳,就将该任务取出来执行并使用zrem原子命令删除数据,否则休眠一段时间后再查询。
redis的ZSET是通过跳跃表来实现的,复杂度为O(logN),N是存放在ZSET中元素的个数。用redis来实现可以依赖于redis自身的持久化来实现持久化,redis的集群来支持高并发和高可用。因此开发成本很小,可以做到很实时。
优点:
1、Redis zset支持高性能的 score 排序。
2、Redis可以动态扩缩容,当消息很多时候,我们可以用集群来提高消息处理的速度,满足容量和性能上的可扩展性。
3、Redis具有持久化机制,当出现故障的时候,可以通过AOF和RDB方式来对数据进行恢复,保证了数据的可靠性。
4、简单实用,快速落地。
缺点:
1、为了避免了当一个 KEY 在存储了较多的延时消息后,入队操作以及查询操作速度变慢的问题(两个操作的时间复杂度均为O(logN)),改进的办法是,将延迟的消息任务通过 hash 算法路由至不同的 Redis Key 上,再开启多个消费线程进行消费,提供吞吐量。
2、没有ack机制,消息存在丢失的可能性。
3、因为是通过定时轮询的方式拉取redis zset中的数据,所以存在一定的时间差,可以通过缩短轮询时间来较少时间差,但是频繁的轮询会造成CPU的浪费,可以通过wait/notify的方式解决该问题。
4、需要实现发送失败自动重试机制。
参考链接:
1、有赞开源实现:https://tech.youzan.com/queuing_delay/
2、美图开源实现:https://zhuanlan.zhihu.com/p/94082947
2、基于RabbitMQ实现
RabbitMQ 本身并不直接提供对延迟队列的支持,我们依靠 RabbitMQ 的TTL以及死信队列功能,来实现延迟队列的效果。
死信队列实际上是一种 RabbitMQ 的消息处理机制,当 RabbmitMQ 在生产和消费消息的时候,消息遇到如下的情况,就会变成“死信”:
- 消息被拒绝basic.reject/ basic.nack 并且不再重新投递 requeue=false
- 消息超时未消费,也就是 TTL 过期了
- 消息队列到达最大长度
消息一旦变成一条死信,便会被重新投递到死信交换机(Dead-Letter-Exchange),然后死信交换机根据绑定规则转发到对应的死信队列上,监听该队列就可以让消息被重新消费。
消息生存时间 TTL
TTL(Time-To-Live)是 RabbitMQ 的一种高级特性,表示了一条消息的最大生存时间,单位为毫秒。如果一条消息在 TTL 设置的时间内没有被消费,那么它就会变成一条死信,进入我们上面所说的死信队列。
有两种不同的方式可以设置消息的 TTL 属性,一种方式是直接在创建队列的时候设置整个队列的 TTL 过期时间,所有进入队列的消息,都被设置成了统一的过期时间,一旦消息过期,马上就会被丢弃,进入死信队列;另一种方式是针对单条消息设置,不过需要注意的是,使用这种方式设置的 TTL,消息可能不会按时死亡,因为 RabbitMQ 只会检查第一个消息是否过期。比如这种情况,第一个消息设置了 20s 的 TTL,第二个消息设置了 10s 的 TTL,那么 RabbitMQ 会等到第一个消息过期之后,才会让第二个消息过期。在RabbitMQ的3.5.8版本以后,我们就可以使用官方推荐的 rabbitmq delayed message exchange 插件很方便地实现延迟消息的功能。
优点:
1、息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。
2、通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延迟队列不可用或者消息丢失。
缺点:
1、需要自己搭建和运维集群。
3、基于RocketMQ实现
rocketmq在发送延时消息时,是先把消息按照延迟时间段发送到指定的队列中(把延时时间段相同的消息放到同一个队列中,保证了消息处理的顺序性,可以让同一个队列中消息延时时间是相同的,整个RocketMQ中延时消息时按照递增顺序排序,保证信息处理的先后顺序性。)。之后,通过一个定时器来轮询处理这些队列里的信息,判断是否到期。对于到期的消息会发送到相应的处理队列中,进行处理。
注意 :目前RocketMQ只支持特定的延时时间段,1s,5s,10s,...2h,不能支持任意时间段的延时设置。
优点:
1、分布式、高吞吐量、高性能、高可靠。
缺点:
1、需要自己搭建和运维集群。
2、只支持特定的延时时间段。
4、基于ActiveMQ实现
ActiveMQ在5.4及以上版本开始支持持久化的延迟消息功能,甚至支持Cron表达式。默认是该功能是不开启的,如果开启需要修改配置文件activemq.xml,在broker节点上把schedulerSupport属性设置为true。
优点:
1、支持cron表达式,更灵活。
缺点:
1、需要自己搭建和运维集群。
5、其他
数据量少的话可以尝试quartz、delayQueue、TimeWheel (时间轮)等方案,但是为了保证数据不丢失,需要借助第三方持久化存储系统,例如rocksDB等。