延迟队列

一、场景介绍

1、下单成功,30分钟未支付。支付超时,自动取消订单

2、订单签收,签收后7天未进行评价。订单超时未评价,系统默认好评

3、下单成功,商家5分钟未接单,订单取消

4、配送超时,推送短信提醒

5、三天会员试用期,三天到期后准时准点通知用户,试用产品到期了

基于以上场景这个时候我们可能最先想到的应该是采用定时任务去进行轮训判断,但是呢,这个时间怎么确定才好呢,5分钟。。1分钟。。执行一次吗。这样就会非常影响性能。且时间误差很大。基于以上业务需要我们想到了有以下解决方案。

1、JDK延迟队列,但是数据都在内存中,重启后什么都没了。

2、MQ中的延迟队列,比如RocketMQ,但是社区版不能自定义。

3、elasticjob一次性动态任务

4、基于Redis实现

二、基于redis实现

redis也可以用来实现延时消息的功能。理论上也有两种方式

1、订阅 key 过期事件(pub/sub)

2、使用 sorted-set 存储消息,score为消息的过期时间

然而实际上订阅过期事件存在诸多问题,所以并不合适:

1、过期事件的不准确,过期时间只在key被删除时才触发,并不是在key过期后就马上删除的

2、pub/sub 不支持持久化,服务器宕机期间的事件会丢失

3、pub/sub 存在丢失的可能,线上使用的redis pub/sub 有丢失过消息(非过期时间)

4、所有的key过期都会发送过期事件,对redis性能有一定影响。(除非单独使用一个redis作为队列服务)

基于以上问题引出今天的主角redisson 的 RDelayedQueue ,下面就梳理下redis延时队列的使用和原理。

redisson实际上是使用了 两个队列 + 一个 sorted-set + pub/sub 来实现延时队列,而不是单一的sort-set,各自功能如下。

1、sorted-set:存放未到期的消息&到期时间,提供消息延时排序功能

2、两个队列:中转队列;目标队列

a、目标队列

//目标队列-阻塞队列-消费队列,存放到期后的消息,提供消费

RBlockingQueue blockingQueue =redissonClient.getBlockingQueue(queueName());

b、中转队列

//中转队列-存放未到期消息,作为消息的原始顺序视图,提供如查询、删除指定第几条消息的功能

RDelayedQueue delayedQueue =redissonClient.getDelayedQueue(blockingQueue);

3、pub/sub:创建延时队列的时候会创建一个QueueTransferTask,在里面会订阅一个topic

在网上扒了一张延迟消息从发送到执行的流程图如下,

1、首先创建延时队列的时候,会创建一个QueueTransferTask, 在里面会订阅一个topic,订阅成功后,执行其pushTask方法,里面会查询sorted-set中100个已到期的消息,将其push到RBlockingQueue中,并从sorted-set和RDelayedQueue 中移除。(这里是为了投递历史未处理的消息)

2、发送延时消息时,会将消息写入RDelayedQueue  和 sorted-set 中,msg会添加一个randomId,支持发送相同的消息。并且判断sorted-set首调消息如果是刚插入的,则publish timeout(到期时间) 到 topic

3、订阅到topic消息后,会先判断其是否临期(delay<10ms),如果是则调用pushTask方法(1中有说明),不是则启动一个定时任务(使用的netty时间轮),延时delay后执行pushTask方法。

再结合源码简单描述一下:

1、首选初始化两个队列,通过redissonClient的get方法创建的

getDelayedQueue方法里面实例化RedissonDelayedQueue对象,我们看看RedissonDelayedQueue的代码

org.redisson.RedissonDelayedQueue#RedissonDelayedQueue

会创建一个QueueTransferTask, 在里面会订阅一个topic,订阅成功后,执行其pushTask方法

看下pushTaskAsync方法,通过lua脚本实现,分析lua脚本里面会查询sorted-set中100个已到期的消息,将其push到RBlockingQueue中,并从sorted-set和RDelayedQueue 中移除。
@Override

        protected RFuture<Long> pushTaskAsync() {

            return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,

                    "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "

                  + "if #expiredValues > 0 then "

                      + "for i, v in ipairs(expiredValues) do "

                          + "local randomId, value = struct.unpack('dLc0', v);"

                          + "redis.call('rpush', KEYS[1], value);"

                          + "redis.call('lrem', KEYS[3], 1, v);"

                      + "end; "

                      + "redis.call('zrem', KEYS[2], unpack(expiredValues));"

                  + "end; "

                    // get startTime from scheduler queue head task

                  + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "

                  + "if v[1] ~= nil then "

                    + "return v[2]; "

                  + "end "

                  + "return nil;",

                  Arrays.<Object>asList(getRawName(), timeoutSetName, queueName),

                  System.currentTimeMillis(), 100);

        }

2、发送延时消息时,会将消息写入 RDelayedQueue和 sorted-set 中,msg会添加一个randomId,支持发送相同的消息。并且判断sorted-set首调消息如果是刚插入的,则publish timeout(到期时间) 到 topic,发送消息调用如下方法,我们来看看

org.redisson.RedissonDelayedQueue#offerAsync(V, long, java.util.concurrent.TimeUnit)

public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {

    if (delay < 0) {

        throw new IllegalArgumentException("Delay can't be negative");

    }

    long delayInMs = timeUnit.toMillis(delay);

    long timeout = System.currentTimeMillis() + delayInMs;

    long randomId = ThreadLocalRandom.current().nextLong();

    return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_VOID,

            "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);"

          + "redis.call('zadd', KEYS[2], ARGV[1], value);"

          + "redis.call('rpush', KEYS[3], value);"

          // if new object added to queue head when publish its startTime

          // to all scheduler workers

          + "local v = redis.call('zrange', KEYS[2], 0, 0); "

          + "if v[1] == value then "

            + "redis.call('publish', KEYS[4], ARGV[1]); "

          + "end;",

          Arrays.<Object>asList(getRawName(), timeoutSetName, queueName, channelName),

          timeout, randomId, encode(e));

}

3、订阅到topic消息后,会先判断其是否临期(delay<10ms),如果是则调用pushTask方法(1中有说明),不是则启动一个定时任务(使用的netty时间轮),延时delay后执行pushTask方法。

org.redisson.QueueTransferTask#scheduleTask

// 订阅topic onMessage 时调用

private void scheduleTask(final Long startTime) {

    TimeoutTask oldTimeout = lastTimeout.get();

    if (startTime == null) {

        return;

    }


    if (oldTimeout != null) {

        oldTimeout.getTask().cancel();

    }

    long delay = startTime - System.currentTimeMillis();

    if (delay > 10) {

    // 使用 netty 时间轮 启动一个定时任务

        Timeout timeout = connectionManager.newTimeout(new TimerTask() {                   

            @Override

            public void run(Timeout timeout) throws Exception {

                pushTask();


                TimeoutTask currentTimeout = lastTimeout.get();

                if (currentTimeout.getTask() == timeout) {

                    lastTimeout.compareAndSet(currentTimeout, null);

                }

            }

        }, delay, TimeUnit.MILLISECONDS);

        if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) {

            timeout.cancel();

        }

    } else {

        pushTask();

    }

}   

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容