
队列是一种线性表,内部的元素是有序的,具有先进先出的特性。
延时队列,顾名思义,它是一个队列,但更重要的是具有延时的特性,与普通队列的先进先出不同,延时队列可以指定队列中的消息在某个时间点被消费。
延时队列的使用场景
订单提交后一定时间内未支付需要自动取消。
接口调用失败后阶梯式的补偿调用。
任务超时提醒。
预定会议提前十五分钟通知与会人员参加会议。
延时队列常用实现方式
java DelayQueue延时队列
DelayQueue是无界的延时阻塞队列,内部是使用优先级队列PriorityQueue实现的,其是按时间来定优先级的延时阻塞队列,只有在延迟期满时才能从队列中提取元素,先过期的元素会在队首,每次从队列里取出来都是最先要过期的元素,当执行队列take操作元素未过期时会阻塞当前线程到元素过期为止;PriorityQueue是通过二叉小顶堆实现, 其任意一个非叶子节点的权值,都不大于其左右子节点的权值。

示例
队列中的元素必须实现Delayed接口
public class MeetingNotice implements Delayed {
private long noticeTime;
private long meetingId;
  
public MeetingNotice(long meetingId, long noticeTime, TimeUnit unit) {
    this.name = name;
    this.noticeTime = System.currentTimeMillis() + (noticeTime > 0 ? unit.toMillis(noticeTime) : 0);
}
@Override
public long getDelay(TimeUnit unit) {
    return noticeTime - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
    MeetingNotice meetingNotice = (MeetingNotice) o;
    long diff = this.time - meetingNotice.time;
    if (diff <= 0) {
        return -1;
    } else {
        return 1;
    }
}
} 
DelayQueue 仅适用于单机部署的应用,对于分布式场景无法适用,同时也不适用于队列元素量很大的场景,不支持持久化。
Redis key过期回调
redis key的过期事件是通过redis 2.8.0之后版本提供的订阅发布功能(pub/sub)下发的,当key过期后系统自动Pub,应用程序只需订阅(sub)该事件即可。
实现步骤
- 
修改redis.conf文件配置如下参数
notify-keyspace-events Ex - 
客户端订阅
redis key过期后系统会publish 频道(channel)
__keyevent@0__:expired其中__keyevent为固定前缀,@0表示db0,订阅是可根据自己的dbindex进行调整,:expired表示过期事件。客户端可通过SUBSCRIBE或PSUBSCRIBE订阅,如SUBSCRIBE __keyevent@0__:expired监听db0的key过期事件。 
示例
     public class RedisKeyExpiredListener extends KeyExpirationEventMessageListener {
     
         public RedisKeyExpiredListener(RedisMessageListenerContainer listenerContainer) {
             super(listenerContainer);
         }
     
         @Override
         public void onMessage(Message message, byte[] pattern) {
             String redisKey = message.toString();
             log.info("监听到key: {}  过期" , redisKey);
         }
     
         @Configuration
         static class RedisKeyExpiredConfig {
     
             /************注册redis监听bean************/
             @Bean
             RedisMessageListenerContainer listenerContainer(RedisConnectionFactory connectionFactory) {
                 RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer();
                 listenerContainer.setConnectionFactory(connectionFactory);
                 return listenerContainer;
             }
     
             @Bean
             KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
                 return new RedisKeyExpiredListener(listenerContainer);
             }
         }
     
     }
存在的问题
key的失效通知无法保证时效性。redis过期策略有一下三种:
| 策略 | 说明 | 优点 | 缺点 | 
|---|---|---|---|
| 定时删除 | 在设置 Key 过期时间的同时创建定时器,让定时器在 Key 过期时执行删除操作 | 保证过期数据能被及时删除 | 耗 CPU,尤其当存在大量非永久 Key 时,对 CPU 影响更严重 | 
| 惰性删除 | Key 过期时不主动删除,获取数据时判断该 Key 是否过期,如果过期直接删除 | 对 CPU 消耗小 | 耗内存,如果数据过期但又没有任何操作来获取该数据,哪怕数据已经过期了,但该数据任会一直存在 | 
| 定期删除 | 每隔一段时间执行一次删除操作 | 不如定时删除那么消耗 CPU,也不如惰性删除那么占内存 | 比定时删除更消耗内存,必惰性删除更消耗 CPU | 
默认情况下,Redis 使用的是惰性删除 + 定期删除的策略;每隔一段时间(可通过hz参数设置每秒执行的次数),Redis 会分别从各个库随机选取部分测试设置了过期时间的 Key,判断它们是否过期,过期则删除;如果 key 已过期,但没有被定期删除,由于惰性删除策略,在下次请求获取该数据时会将该数据删除。
可通过如下方式提高时效性
- 将缓存数据与监听过期key数据分离,例如把缓存数据存在 database0,把监听数据存在 database1;让进行监听的库中 key 尽量少,如果不同业务的监听超时时间差异较大,则考虑将不同业务的超时监听数据存放到不同的数据库;
 - 调整过期策略为定时删除策略,但这样CPU定时器的开销会增大。
 
Redis zset
redis zset 结构是一个有序集合,每个元素都会关联一个 double 类型的分数,通过分数来为集合中的成员进行从小到大的排序;有序集合的成员是唯一的,但分数(score)却可以重复。
实现思路
将任务id作为member,到期时间作为score存入到zset中,然后不断轮询获取第一个元素,判断其是否过期,过期后删除并执行任务即可。
@Slf4j
@Component
public class TestZsetDelayTask {
    @Autowired
    private StringRedisTemplate redisTemplate;
    private static final String REDIS_DELAY_TASK_KEY="test_delay_task";
    @PostConstruct
    public void consumerRedisDelayTask() throws InterruptedException {
      ZSetOperations<String,String> zSetOperations = redisTemplate.opsForZSet();
      while(true){
          //获取当前时间内的第一个任务
        Long score = System.currentTimeMillis();
        Set<String> tasks =  zSetOperations.rangeByScore(REDIS_DELAY_TASK_KEY,0,score);
        if(CollectionUtils.isEmpty(tasks)){
            Thread.sleep(200);
        }else{
            //移除该任务
            String task = (String) tasks.iterator().next();
           if(zSetOperations.remove(REDIS_DELAY_TASK_KEY,task)>0){
               log.info("任务: {} 准备执行" , task);
           }
        }
      }
    }
}
也可以通过lua脚本将zrangebyscore和zrem操作变成原子操作,避免了多线程时同一个me mber多次zrem。
String luaScript = "local resultArray = redis.call('zrangebyscore', KEYS[1], 0, ARGV[1], 'limit' , 0, 1)\n" +
        "if #resultArray > 0 then\n" +
        "    if redis.call('zrem', KEYS[1], resultArray[1]) > 0 then\n" +
        "        return resultArray[1]\n" +
        "    else\n" +
        "        return ''\n" +
        "    end\n" +
        "else\n" +
        "    return ''\n" +
        "end";
存在的问题
- 存在大量的空轮询不但占用了客户端的 CPU,同时也占用了redis的资源,空轮询的客户端有过多,redis的慢查询可能会显著增多。设置sleep的时间过大也会出现时效性不及时问题。
 - 没有重试和ack机制,客户端异常时,这条任务可能会丢失。
 
可以使用Redission的RDelayedQueue数据结构,其api类似于java queue使用简单,可更方便的实现基于redis的延时队列。感兴趣的可自行了解,这里不再展开。
RabbitMQ延时队列
RabbitMQ本身没有直接支持延迟队列功能,但是可以通过ttl及dlx(Dead Letter Exchanges)特性模拟出延迟队列的功能。
死信队列
绑定在死信交换机上的队列。RabbitMQ的Queue(队列)可以配置两个参数x-dead-letter-exchange(死信交换机)和x-dead-letter-routing-key(指定routing-key发送,可选),当消息在一个队列中变成死信 (dead message) 之后,按照这两个参数可以将消息重新路由到另一个DLX Exchange(死信交换机),让消息重新被消费。
队列出现Dead Letter的情况有:
消息或者队列的TTL过期
队列达到最大长度
消息被消费端拒绝(basic.reject / basic.nack),并且requeue = false
RabbitMQ ttl
RabbitMQ可以对消息和队列设置TTL,为队列设置时,队列中所有消息都有相同的过期时间;对消息进行单独设置,每条消息过期时间可以不同;如果同时设置了队列的ttl和消息的ttl以两者之间TTL较小的那个数值为准。消息超过设置的ttl值未被消费,将会变为死信,消费者将无法再收到该消息。
x-message-ttl 为队列设置过期时间。
expiration 为消息设置过期时间。
RabbitMQ延时队列示例
@Configuration
@Slf4j
public class RabbitMqDelayQueue {
    /**
     * 普通交换机
     */
    private static final String NORMAL_EXECHANGE = "test-exchange";
    /**
     * 死信交换机名称
     */
    private static final String DLX_EXCHANGE ="test-dlx-exchange";
    /**
     * 普通队列名称
     */
    private static final String NORMAL_QUEUE_NAME = "test-queue";
    /**
     * 死信队列名称
     */
    private static final String DLX_QUEUE_NAME ="test-dlx-queue";
    private static final String ROUTING_KEY="test";
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /*************普通交换机队列的声明及绑定*************/
    @Bean
    public DirectExchange normalExchange(){
        return new DirectExchange(NORMAL_EXECHANGE, true,false);
    }
    @Bean
    public Queue normalQueue(){
        //设置队列过期时间
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-message-ttl",30000);
        //设置死信后重新路由的交换机
        args.put("x-dead-letter-exchange", DLX_EXCHANGE);
        args.put("x-dead-letter-routing-key",ROUTING_KEY);
        return new Queue(NORMAL_QUEUE_NAME,true,false,false,args);
    }
    @Bean
    public Binding normalBinding(){
        return BindingBuilder.bind(normalQueue()).to(normalExchange()).with(ROUTING_KEY);
    }
    /*************死信交换机队列的声明及绑定*************/
    @Bean
    public DirectExchange dlxExchange(){
        return new DirectExchange(DLX_EXCHANGE, true,false);
    }
    @Bean
    public Queue dlxQueue(){
        return new Queue(DLX_QUEUE_NAME,true);
    }
    @Bean
    public Binding dlxBinding(){
        return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(ROUTING_KEY);
    }
    @RabbitListener(queues = DLX_QUEUE_NAME)
    public void consumerDlxQueue(@Payload String message) {
        log.info("消费到死信消息:{}",message);
    }
    @PostConstruct
    public void sendMessage(){
        Message message = MessageBuilder.withBody("hello rabbitmq".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                //设置消息的过期时间(毫秒)
                .setExpiration("20000")
                .build();
        rabbitTemplate.send(NORMAL_EXECHANGE,ROUTING_KEY,message);
        log.info("发送mq消息成功");
    }
}
存在的问题
ttl消息按照入发送顺序排列在队列中,且rabbitMQ只会判断队列头消息是否失效,失效后才会加入到死信队列中,如果发送多个过期时间不一致的消息,有可能后面的消息已经过期了,但队列头消息没有过期,导致其他消息不能及时加入到死信队列被消费。
rabbitmq_delayed_message_exchange插件
针对上述的问题,可以使用rabbitmq_delayed_message_exchang插件来解决。
安装该插件后会生成新的Exchange类型x-delayed-message,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在mnesia(一个分布式数据系统)表中,检测消息延迟时间(通过消息头的x-delay指定),如达到可投递时间时并将其通过x-delayed-type类型标记的交换机类型投递至目标队列。
插件的安装
- 进入https://www.rabbitmq.com/community-plugins.html 页面找到rabbitmq_delayed_message_exchang并下载。
 - 将下载的插件复制到rabbitmq的plugins目录
 - 执行
rabbitmq-plugins enable rabbitmq_delayed_message_exchange启用该插件。 
使用示例
/**
     * 延迟消息交换机
     */
    public final static String DELAY_EXCHANGE = "test-delay-exchange";
    /**
     * 队列
     */
    public final static String DELAY_QUEUE = "test-delay-queue";
    /**
     * 路由Key
     */
    public final static String DELAY_ROUTING_KEY = "test-delay-routingKey";
    @Bean
    public CustomExchange delayMessageExchange() {
        //自定义交换机,type必须为x-delayed-message,添加参数x-delayed-type=direct
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", false, false, args);
    }
    @Bean
    public Queue delayMessageQueue() {
        return new Queue(DELAY_QUEUE, true);
    }
    @Bean
    public Binding bindingDelayExchangeAndQueue() {
        return BindingBuilder.bind(delayMessageQueue()).to(delayMessageExchange()).with(DELAY_ROUTING_KEY).noargs();
    }
    @PostConstruct
    public void sendDelayMessages(){
        Message message1 = MessageBuilder.withBody("delay message1".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .build();
        //设置消息过期时间
        message1.getMessageProperties().setDelay(20000);
        rabbitTemplate.send(DELAY_EXCHANGE,DELAY_ROUTING_KEY,message1);
        Message message2 = MessageBuilder.withBody("delay message2".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .build();
        //设置消息过期时间
        message2.getMessageProperties().setDelay(15000);
        rabbitTemplate.send(DELAY_EXCHANGE,DELAY_ROUTING_KEY,message2);
        log.info("发送mq delay 消息成功");
    }
    @RabbitListener(queues = DELAY_QUEUE)
    public void consumerDelayQueue(@Payload String message) {
        log.info("消费到延时消息:{}",message);
    }
插件的局限
- 插件极限时间是 8byte 长度 ms,大概 49天,如果你的延时时间很长,超过49天那么该消息将会立刻被投递到队列中,不会延时。
 - 该插件的当前设计并不真正适合包含大量延迟消息(例如数十万或数百万)的场景。有关详情,请参见#72
 - 如果该插件被禁用那么插件上的延时消息将丢失(还未投递到目标队列的)。
 
时间轮
时间轮的应用广泛,包括linux内核的调度、zookeeper、netty、kafka、xxl-job、quartz等均有使用时间轮。
原理

图中的圆盘可以看作是钟表的刻度。比如一圈round长度为24秒,刻度数为8,那么每一个刻度表示3秒。那么时间精度就是3秒。每个刻度为一个bucket(实际上就是TimerTaskList),TimerTaskList是环形双向链表,在其中链表项TimeTaskEntry封装了真正的定时任务TimerTask。TimerTaskList使用expiration字段记录了整个TimerTaskList的超时时间。TimeTaskEntry中的expirationMs字段记录了超时时间戳,timerTask字段指向了对应的TimerTask任务;根据每个TimerTaskEntry的过期时间和当前时间轮的时间,选择一个合适的bucket,把这个TimerTaskEntry对象放进去;对于延迟超过时间轮所能表示的范围有两种处理方式,一是通过增加一个字段-轮数,Netty 就是这样实现的;二是多层时间轮,Kakfa 是这样实现的。
下面介绍下kafka的多层时间轮,层数越高时间跨度越大。

每个使用到的TimerTaskList都会加入到DelayQueue中,DelayQueue会根据TimerTaskList对应的超时时间expiration来排序,最短expiration的TimerTaskList会被排在DelayQueue的队头,通过一个线程获取到DelayQueue中的超时的任务列表TimerTaskList之后,既可以根据TimerTaskList的expiration来推进时间轮的时间,也可以就获取到的TimerTaskList执行相应的操作,TimerTaskEntry该执行过期操作的就执行过期操作,该降级时间轮的就降级时间轮。
举个例子
假设现在有一个任务在445ms后执行,默认情况下,各个层级的时间轮的时间格个数为20,第一层时间轮每一个时间格跨度为1ms,整个时间轮跨度为20ms,跨度不够。第二层时间轮每一个时间格跨度为20ms,整个时间轮跨度为400ms,跨度依然不够,第三层时间轮每一个时间格跨度为400ms,整个时间轮跨度为8000ms,现在跨度够了,此任务就放在第三层时间轮的第一个时间格对应的TimerTaskList,等待被执行,此TimerTaskList到期时间是400ms,随着时间的流逝,当此TimerTaskList到期时,距离该任务到期时间还有45ms,不能执行该任务,将重新提交到时间轮,此时第一层时间轮跨度依然不够,不能执行任务,第二层时间轮时间格跨度为20,整个世间轮跨度为400,跨度足够,放在第三个时间格等待执行,如此往复几次,高层时间轮最终会慢慢移动到低层时间轮上,最终任务到期执行。
与kafka时间轮相比,netty采用的是轮次来解决超过时间轮所能表示的范围,通过固定的时间间隔tickDuration扫描,时候未到就等待来进行时间轮的推动,会有空推进的情况,例如 TickDuration 为1秒,此时就一个延迟350秒的任务,那就是有349次无用的操作。
Netty时间轮使用示例
public class TimeWheelDealyQueue {
    public static void main(String[] args) {
        /**
         * 参数依次为
         * 1.ThreadFactory 自定义线程工厂,用于创建线程执行TimerTask
         * 2.tickDuration  间隔多久走到下一槽(相当于时钟走一格),值越小,时间轮精度越高
         * 3.unit 定义tickDuration的时间单位
         * 4.ticksPerWheel 一圈有多个槽
         * 5.leakDetection 是否开启内存泄漏检测。
         * 6. maxPendingTimeouts 最多待执行的任务个数。0或负数表示无限制。
         */
        Timer timer = new HashedWheelTimer(Executors.defaultThreadFactory(), 10, TimeUnit.MILLISECONDS, 10);
        System.out.println("开始添加任务:" + System.currentTimeMillis());
        //延迟任务,5秒后执行
        timer.newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                System.out.println("任务开始执行:"+System.currentTimeMillis());
            }
        }, 5, TimeUnit.SECONDS);
    }
}