队列是一种线性表,内部的元素是有序的,具有先进先出的特性。
延时队列,顾名思义,它是一个队列,但更重要的是具有延时的特性,与普通队列的先进先出不同,延时队列可以指定队列中的消息在某个时间点被消费。
延时队列的使用场景
订单提交后一定时间内未支付需要自动取消。
接口调用失败后阶梯式的补偿调用。
任务超时提醒。
预定会议提前十五分钟通知与会人员参加会议。
延时队列常用实现方式
java DelayQueue延时队列
DelayQueue是无界的延时阻塞队列,内部是使用优先级队列PriorityQueue实现的,其是按时间来定优先级的延时阻塞队列,只有在延迟期满时才能从队列中提取元素,先过期的元素会在队首,每次从队列里取出来都是最先要过期的元素,当执行队列take操作元素未过期时会阻塞当前线程到元素过期为止;PriorityQueue是通过二叉小顶堆实现, 其任意一个非叶子节点的权值,都不大于其左右子节点的权值。
示例
队列中的元素必须实现Delayed接口
public class MeetingNotice implements Delayed {
private long noticeTime;
private long meetingId;
public MeetingNotice(long meetingId, long noticeTime, TimeUnit unit) {
this.name = name;
this.noticeTime = System.currentTimeMillis() + (noticeTime > 0 ? unit.toMillis(noticeTime) : 0);
}
@Override
public long getDelay(TimeUnit unit) {
return noticeTime - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
MeetingNotice meetingNotice = (MeetingNotice) o;
long diff = this.time - meetingNotice.time;
if (diff <= 0) {
return -1;
} else {
return 1;
}
}
}
DelayQueue 仅适用于单机部署的应用,对于分布式场景无法适用,同时也不适用于队列元素量很大的场景,不支持持久化。
Redis key过期回调
redis key的过期事件是通过redis 2.8.0之后版本提供的订阅发布功能(pub/sub)下发的,当key过期后系统自动Pub,应用程序只需订阅(sub)该事件即可。
实现步骤
-
修改redis.conf文件配置如下参数
notify-keyspace-events Ex
-
客户端订阅
redis key过期后系统会publish 频道(channel)
__keyevent@0__:expired
其中__keyevent
为固定前缀,@0
表示db0,订阅是可根据自己的dbindex进行调整,:expired
表示过期事件。客户端可通过SUBSCRIBE
或PSUBSCRIBE
订阅,如SUBSCRIBE __keyevent@0__:expired
监听db0的key过期事件。
示例
public class RedisKeyExpiredListener extends KeyExpirationEventMessageListener {
public RedisKeyExpiredListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Override
public void onMessage(Message message, byte[] pattern) {
String redisKey = message.toString();
log.info("监听到key: {} 过期" , redisKey);
}
@Configuration
static class RedisKeyExpiredConfig {
/************注册redis监听bean************/
@Bean
RedisMessageListenerContainer listenerContainer(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer();
listenerContainer.setConnectionFactory(connectionFactory);
return listenerContainer;
}
@Bean
KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
return new RedisKeyExpiredListener(listenerContainer);
}
}
}
存在的问题
key的失效通知无法保证时效性。redis过期策略有一下三种:
策略 | 说明 | 优点 | 缺点 |
---|---|---|---|
定时删除 | 在设置 Key 过期时间的同时创建定时器,让定时器在 Key 过期时执行删除操作 | 保证过期数据能被及时删除 | 耗 CPU,尤其当存在大量非永久 Key 时,对 CPU 影响更严重 |
惰性删除 | Key 过期时不主动删除,获取数据时判断该 Key 是否过期,如果过期直接删除 | 对 CPU 消耗小 | 耗内存,如果数据过期但又没有任何操作来获取该数据,哪怕数据已经过期了,但该数据任会一直存在 |
定期删除 | 每隔一段时间执行一次删除操作 | 不如定时删除那么消耗 CPU,也不如惰性删除那么占内存 | 比定时删除更消耗内存,必惰性删除更消耗 CPU |
默认情况下,Redis 使用的是惰性删除 + 定期删除的策略;每隔一段时间(可通过hz参数设置每秒执行的次数),Redis 会分别从各个库随机选取部分测试设置了过期时间的 Key,判断它们是否过期,过期则删除;如果 key 已过期,但没有被定期删除,由于惰性删除策略,在下次请求获取该数据时会将该数据删除。
可通过如下方式提高时效性
- 将缓存数据与监听过期key数据分离,例如把缓存数据存在 database0,把监听数据存在 database1;让进行监听的库中 key 尽量少,如果不同业务的监听超时时间差异较大,则考虑将不同业务的超时监听数据存放到不同的数据库;
- 调整过期策略为定时删除策略,但这样CPU定时器的开销会增大。
Redis zset
redis zset 结构是一个有序集合,每个元素都会关联一个 double 类型的分数,通过分数来为集合中的成员进行从小到大的排序;有序集合的成员是唯一的,但分数(score)却可以重复。
实现思路
将任务id作为member,到期时间作为score存入到zset中,然后不断轮询获取第一个元素,判断其是否过期,过期后删除并执行任务即可。
@Slf4j
@Component
public class TestZsetDelayTask {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String REDIS_DELAY_TASK_KEY="test_delay_task";
@PostConstruct
public void consumerRedisDelayTask() throws InterruptedException {
ZSetOperations<String,String> zSetOperations = redisTemplate.opsForZSet();
while(true){
//获取当前时间内的第一个任务
Long score = System.currentTimeMillis();
Set<String> tasks = zSetOperations.rangeByScore(REDIS_DELAY_TASK_KEY,0,score);
if(CollectionUtils.isEmpty(tasks)){
Thread.sleep(200);
}else{
//移除该任务
String task = (String) tasks.iterator().next();
if(zSetOperations.remove(REDIS_DELAY_TASK_KEY,task)>0){
log.info("任务: {} 准备执行" , task);
}
}
}
}
}
也可以通过lua脚本将zrangebyscore
和zrem
操作变成原子操作,避免了多线程时同一个me mber多次zrem。
String luaScript = "local resultArray = redis.call('zrangebyscore', KEYS[1], 0, ARGV[1], 'limit' , 0, 1)\n" +
"if #resultArray > 0 then\n" +
" if redis.call('zrem', KEYS[1], resultArray[1]) > 0 then\n" +
" return resultArray[1]\n" +
" else\n" +
" return ''\n" +
" end\n" +
"else\n" +
" return ''\n" +
"end";
存在的问题
- 存在大量的空轮询不但占用了客户端的 CPU,同时也占用了redis的资源,空轮询的客户端有过多,redis的慢查询可能会显著增多。设置sleep的时间过大也会出现时效性不及时问题。
- 没有重试和ack机制,客户端异常时,这条任务可能会丢失。
可以使用Redission的RDelayedQueue数据结构,其api类似于java queue使用简单,可更方便的实现基于redis的延时队列。感兴趣的可自行了解,这里不再展开。
RabbitMQ延时队列
RabbitMQ本身没有直接支持延迟队列功能,但是可以通过ttl及dlx(Dead Letter Exchanges)特性模拟出延迟队列的功能。
死信队列
绑定在死信交换机上的队列。RabbitMQ的Queue(队列)可以配置两个参数x-dead-letter-exchange(死信交换机)和x-dead-letter-routing-key(指定routing-key发送,可选),当消息在一个队列中变成死信 (dead message)
之后,按照这两个参数可以将消息重新路由到另一个DLX Exchange(死信交换机),让消息重新被消费。
队列出现Dead Letter的情况有:
消息或者队列的TTL过期
队列达到最大长度
消息被消费端拒绝(basic.reject / basic.nack),并且requeue = false
RabbitMQ ttl
RabbitMQ可以对消息和队列设置TTL,为队列设置时,队列中所有消息都有相同的过期时间;对消息进行单独设置,每条消息过期时间可以不同;如果同时设置了队列的ttl和消息的ttl以两者之间TTL较小的那个数值为准。消息超过设置的ttl值未被消费,将会变为死信,消费者将无法再收到该消息。
x-message-ttl 为队列设置过期时间。
expiration 为消息设置过期时间。
RabbitMQ延时队列示例
@Configuration
@Slf4j
public class RabbitMqDelayQueue {
/**
* 普通交换机
*/
private static final String NORMAL_EXECHANGE = "test-exchange";
/**
* 死信交换机名称
*/
private static final String DLX_EXCHANGE ="test-dlx-exchange";
/**
* 普通队列名称
*/
private static final String NORMAL_QUEUE_NAME = "test-queue";
/**
* 死信队列名称
*/
private static final String DLX_QUEUE_NAME ="test-dlx-queue";
private static final String ROUTING_KEY="test";
@Autowired
private RabbitTemplate rabbitTemplate;
/*************普通交换机队列的声明及绑定*************/
@Bean
public DirectExchange normalExchange(){
return new DirectExchange(NORMAL_EXECHANGE, true,false);
}
@Bean
public Queue normalQueue(){
//设置队列过期时间
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl",30000);
//设置死信后重新路由的交换机
args.put("x-dead-letter-exchange", DLX_EXCHANGE);
args.put("x-dead-letter-routing-key",ROUTING_KEY);
return new Queue(NORMAL_QUEUE_NAME,true,false,false,args);
}
@Bean
public Binding normalBinding(){
return BindingBuilder.bind(normalQueue()).to(normalExchange()).with(ROUTING_KEY);
}
/*************死信交换机队列的声明及绑定*************/
@Bean
public DirectExchange dlxExchange(){
return new DirectExchange(DLX_EXCHANGE, true,false);
}
@Bean
public Queue dlxQueue(){
return new Queue(DLX_QUEUE_NAME,true);
}
@Bean
public Binding dlxBinding(){
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(ROUTING_KEY);
}
@RabbitListener(queues = DLX_QUEUE_NAME)
public void consumerDlxQueue(@Payload String message) {
log.info("消费到死信消息:{}",message);
}
@PostConstruct
public void sendMessage(){
Message message = MessageBuilder.withBody("hello rabbitmq".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
//设置消息的过期时间(毫秒)
.setExpiration("20000")
.build();
rabbitTemplate.send(NORMAL_EXECHANGE,ROUTING_KEY,message);
log.info("发送mq消息成功");
}
}
存在的问题
ttl消息按照入发送顺序排列在队列中,且rabbitMQ只会判断队列头消息是否失效,失效后才会加入到死信队列中,如果发送多个过期时间不一致的消息,有可能后面的消息已经过期了,但队列头消息没有过期,导致其他消息不能及时加入到死信队列被消费。
rabbitmq_delayed_message_exchange插件
针对上述的问题,可以使用rabbitmq_delayed_message_exchang
插件来解决。
安装该插件后会生成新的Exchange类型x-delayed-message
,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在mnesia
(一个分布式数据系统)表中,检测消息延迟时间(通过消息头的x-delay指定),如达到可投递时间时并将其通过x-delayed-type
类型标记的交换机类型投递至目标队列。
插件的安装
- 进入https://www.rabbitmq.com/community-plugins.html 页面找到rabbitmq_delayed_message_exchang并下载。
- 将下载的插件复制到rabbitmq的plugins目录
- 执行
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
启用该插件。
使用示例
/**
* 延迟消息交换机
*/
public final static String DELAY_EXCHANGE = "test-delay-exchange";
/**
* 队列
*/
public final static String DELAY_QUEUE = "test-delay-queue";
/**
* 路由Key
*/
public final static String DELAY_ROUTING_KEY = "test-delay-routingKey";
@Bean
public CustomExchange delayMessageExchange() {
//自定义交换机,type必须为x-delayed-message,添加参数x-delayed-type=direct
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", false, false, args);
}
@Bean
public Queue delayMessageQueue() {
return new Queue(DELAY_QUEUE, true);
}
@Bean
public Binding bindingDelayExchangeAndQueue() {
return BindingBuilder.bind(delayMessageQueue()).to(delayMessageExchange()).with(DELAY_ROUTING_KEY).noargs();
}
@PostConstruct
public void sendDelayMessages(){
Message message1 = MessageBuilder.withBody("delay message1".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
//设置消息过期时间
message1.getMessageProperties().setDelay(20000);
rabbitTemplate.send(DELAY_EXCHANGE,DELAY_ROUTING_KEY,message1);
Message message2 = MessageBuilder.withBody("delay message2".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
//设置消息过期时间
message2.getMessageProperties().setDelay(15000);
rabbitTemplate.send(DELAY_EXCHANGE,DELAY_ROUTING_KEY,message2);
log.info("发送mq delay 消息成功");
}
@RabbitListener(queues = DELAY_QUEUE)
public void consumerDelayQueue(@Payload String message) {
log.info("消费到延时消息:{}",message);
}
插件的局限
- 插件极限时间是 8byte 长度 ms,大概 49天,如果你的延时时间很长,超过49天那么该消息将会立刻被投递到队列中,不会延时。
- 该插件的当前设计并不真正适合包含大量延迟消息(例如数十万或数百万)的场景。有关详情,请参见#72
- 如果该插件被禁用那么插件上的延时消息将丢失(还未投递到目标队列的)。
时间轮
时间轮的应用广泛,包括linux内核的调度、zookeeper、netty、kafka、xxl-job、quartz等均有使用时间轮。
原理
图中的圆盘可以看作是钟表的刻度。比如一圈round长度为24秒,刻度数为8,那么每一个刻度表示3秒。那么时间精度就是3秒。每个刻度为一个bucket(实际上就是TimerTaskList),TimerTaskList是环形双向链表,在其中链表项TimeTaskEntry封装了真正的定时任务TimerTask。TimerTaskList使用expiration字段记录了整个TimerTaskList的超时时间。TimeTaskEntry中的expirationMs字段记录了超时时间戳,timerTask字段指向了对应的TimerTask任务;根据每个TimerTaskEntry的过期时间和当前时间轮的时间,选择一个合适的bucket,把这个TimerTaskEntry对象放进去;对于延迟超过时间轮所能表示的范围有两种处理方式,一是通过增加一个字段-轮数,Netty 就是这样实现的;二是多层时间轮,Kakfa 是这样实现的。
下面介绍下kafka的多层时间轮,层数越高时间跨度越大。
每个使用到的TimerTaskList都会加入到DelayQueue中,DelayQueue会根据TimerTaskList对应的超时时间expiration来排序,最短expiration的TimerTaskList会被排在DelayQueue的队头,通过一个线程获取到DelayQueue中的超时的任务列表TimerTaskList之后,既可以根据TimerTaskList的expiration来推进时间轮的时间,也可以就获取到的TimerTaskList执行相应的操作,TimerTaskEntry该执行过期操作的就执行过期操作,该降级时间轮的就降级时间轮。
举个例子
假设现在有一个任务在445ms后执行,默认情况下,各个层级的时间轮的时间格个数为20,第一层时间轮每一个时间格跨度为1ms,整个时间轮跨度为20ms,跨度不够。第二层时间轮每一个时间格跨度为20ms,整个时间轮跨度为400ms,跨度依然不够,第三层时间轮每一个时间格跨度为400ms,整个时间轮跨度为8000ms,现在跨度够了,此任务就放在第三层时间轮的第一个时间格对应的TimerTaskList,等待被执行,此TimerTaskList到期时间是400ms,随着时间的流逝,当此TimerTaskList到期时,距离该任务到期时间还有45ms,不能执行该任务,将重新提交到时间轮,此时第一层时间轮跨度依然不够,不能执行任务,第二层时间轮时间格跨度为20,整个世间轮跨度为400,跨度足够,放在第三个时间格等待执行,如此往复几次,高层时间轮最终会慢慢移动到低层时间轮上,最终任务到期执行。
与kafka时间轮相比,netty采用的是轮次来解决超过时间轮所能表示的范围,通过固定的时间间隔tickDuration扫描,时候未到就等待来进行时间轮的推动,会有空推进的情况,例如 TickDuration 为1秒,此时就一个延迟350秒的任务,那就是有349次无用的操作。
Netty时间轮使用示例
public class TimeWheelDealyQueue {
public static void main(String[] args) {
/**
* 参数依次为
* 1.ThreadFactory 自定义线程工厂,用于创建线程执行TimerTask
* 2.tickDuration 间隔多久走到下一槽(相当于时钟走一格),值越小,时间轮精度越高
* 3.unit 定义tickDuration的时间单位
* 4.ticksPerWheel 一圈有多个槽
* 5.leakDetection 是否开启内存泄漏检测。
* 6. maxPendingTimeouts 最多待执行的任务个数。0或负数表示无限制。
*/
Timer timer = new HashedWheelTimer(Executors.defaultThreadFactory(), 10, TimeUnit.MILLISECONDS, 10);
System.out.println("开始添加任务:" + System.currentTimeMillis());
//延迟任务,5秒后执行
timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println("任务开始执行:"+System.currentTimeMillis());
}
}, 5, TimeUnit.SECONDS);
}
}