实现延时任务的 4 种实现方案

一、应用场景

在需求开发过程中,我们经常会遇到一些类似下面的场景:

a. 外卖订单超过15分钟未支付,自动取消

b. 使用抢票软件订到车票后,1小时内未支付,自动取消

c. 待处理申请超时1天,通知审核人员经理,超时2天通知审核人员总监

d. 客户预定自如房子后,24小时内未支付,房源自动释放

那么针对这类场景的需求应该如果实现呢,我们最先想到的一般是启个定时任务,来扫描数据库里符合条件的数据,并对其进行更新操作。一般来说spring-quartz 、elasticjob 就可以实现,甚至自己写个 Timer 也可以。

但是这种方式有个弊端,就是需要不停的扫描数据库,如果数据量比较大,并且任务执行间隔时间比较短,对数据库会有一定的压力。另外定时任务的执行间隔时间的粒度也不太好设置,设置长会影响时效性,设置太短又会增加服务压力。我们来看一下有没有更好的实现方式。

二、JDK 延时队列实现

DelayQueue 是 JDK 中 java.util.concurrent 包下的一种无界阻塞队列,底层是优先队列 PriorityQueue。对于放到队列中的任务,可以按照到期时间进行排序,只需要取已经到期的元素处理即可。

具体的步骤是,要放入队列的元素需要实现 Delayed 接口并实现 getDelay 方法来计算到期时间,compare 方法来对比到期时间以进行排序。一个简单的使用例子如下:

package com.lyqiang.delay.jdk;

import java.time.LocalDateTime;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * @author lyqiang
 */
public class TestDelayQueue {

    public static void main(String[] args) throws InterruptedException {

        // 新建3个任务,并依次设置超时时间为 20s 10s 30s
        DelayTask d1 = new DelayTask(1, System.currentTimeMillis() + 20000L);
        DelayTask d2 = new DelayTask(2, System.currentTimeMillis() + 10000L);
        DelayTask d3 = new DelayTask(3, System.currentTimeMillis() + 30000L);

        DelayQueue<DelayTask> queue = new DelayQueue<>();
        queue.add(d1);
        queue.add(d2);
        queue.add(d3);
        int size = queue.size();

        System.out.println("当前时间是:" + LocalDateTime.now());

        // 从延时队列中获取元素, 将输出 d2 、d1 、d3
        for (int i = 0; i < size; i++) {
            System.out.println(queue.take() + " ------ " + LocalDateTime.now());
        }
    }
}

class DelayTask implements Delayed {

    private Integer taskId;

    private long exeTime;

    DelayTask(Integer taskId, long exeTime) {
        this.taskId = taskId;
        this.exeTime = exeTime;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return exeTime - System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
        DelayTask t = (DelayTask) o;
        if (this.exeTime - t.exeTime <= 0) {
            return -1;
        } else {
            return 1;
        }
    }

    @Override
    public String toString() {
        return "DelayTask{" +
                "taskId=" + taskId +
                ", exeTime=" + exeTime +
                '}';
    }
}

代码的执行结果如下:

使用 DelayQueue, 只需要有一个线程不断从队列中获取数据即可,它的优点是不用引入第三方依赖,实现也很简单,缺点也很明显,它是内存存储,对分布式支持不友好,如果发生单点故障,可能会造成数据丢失,无界队列还存在 OOM 的风险。

三、时间轮算法实现

1996 年 George Varghese 和 Tony Lauck 的论文《Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facility》中提出了一种时间轮管理 Timeout 事件的方式。其设计非常巧妙,并且类似时钟的运行,如下图的原始时间轮有 8 个格子,假定指针经过每个格子花费时间是 1 个时间单位,当前指针指向 0,一个 17 个时间单位后超时的任务则需要运转 2 圈再通过一个格子后被执行,放在相同格子的任务会形成一个链表。

Netty 包里提供了一种时间轮的实现——HashedWheelTimer,其底层使用了数组+链表的数据结构,使用方式如下:

package com.lyqiang.delay.wheeltimer;

import io.netty.util.HashedWheelTimer;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;

/**
 * @author lyqiang
 */
public class WheelTimerTest {

    public static void main(String[] args) {

        //设置每个格子是 100ms, 总共 256 个格子
        HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 256);

        //加入三个任务,依次设置超时时间是 10s 5s 20s

        System.out.println("加入一个任务,ID = 1, time= " + LocalDateTime.now());
        hashedWheelTimer.newTimeout(timeout -> {
            System.out.println("执行一个任务,ID = 1, time= " + LocalDateTime.now());
        }, 10, TimeUnit.SECONDS);

        System.out.println("加入一个任务,ID = 2, time= " + LocalDateTime.now());
        hashedWheelTimer.newTimeout(timeout -> {
            System.out.println("执行一个任务,ID = 2, time= " + LocalDateTime.now());
        }, 5, TimeUnit.SECONDS);

        System.out.println("加入一个任务,ID = 3, time= " + LocalDateTime.now());
        hashedWheelTimer.newTimeout(timeout -> {
            System.out.println("执行一个任务,ID = 3, time= " + LocalDateTime.now());
        }, 20, TimeUnit.SECONDS);

        System.out.println("等待任务执行===========");
    }
}

代码执行结果如下:

相比 DelayQueue 的数据结构,时间轮在算法复杂度上有一定优势,但用时间轮来实现延时任务同样避免不了单点故障。

四、Redis ZSet 实现

Redis 里有 5 种数据结构,最常用的是 String 和 Hash,而 ZSet 是一种支持按 score 排序的数据结构,每个元素都会关联一个 double 类型的分数,Redis 通过分数来为集合中的成员进行从小到大的排序,借助这个特性我们可以把超时时间作为 score 来将任务进行排序。

使用 zadd key score member 命令向 redis 中放入任务,超时时间作为 score, 任务 ID 作为 member, 使用 zrange key start stop withscores 命令从 redis 中读取任务,使用 zrem key member 命令从 redis 中删除任务。代码如下:

package com.lyqiang.delay.redis;

import java.time.LocalDateTime;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @author lyqiang
 */
public class TestRedisDelay {

    public static void main(String[] args) {

        TaskProducer taskProducer = new TaskProducer();
        //创建 3个任务,并设置超时间为 10s 5s 20s
        taskProducer.produce(1, System.currentTimeMillis() + 10000);
        taskProducer.produce(2, System.currentTimeMillis() + 5000);
        taskProducer.produce(3, System.currentTimeMillis() + 20000);

        System.out.println("等待任务执行===========");

        //消费端从redis中消费任务
        TaskConsumer taskConsumer = new TaskConsumer();
        taskConsumer.consumer();
    }
}

class TaskProducer {

    public void produce(Integer taskId, long exeTime) {
        System.out.println("加入任务, taskId: " + taskId + ", exeTime: " + exeTime + ", 当前时间:" + LocalDateTime.now());
        RedisOps.getJedis().zadd(RedisOps.key, exeTime, String.valueOf(taskId));
    }
}

class TaskConsumer {

    public void consumer() {

        Executors.newSingleThreadExecutor().submit(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    Set<String> taskIdSet = RedisOps.getJedis().zrangeByScore(RedisOps.key, 0, System.currentTimeMillis(), 0, 1);
                    if (taskIdSet == null || taskIdSet.isEmpty()) {
                        //System.out.println("没有任务");
                    } else {
                        taskIdSet.forEach(id -> {
                            long result = RedisOps.getJedis().zrem(RedisOps.key, id);
                            if (result == 1L) {
                                System.out.println("从延时队列中获取到任务,taskId:" + id + " , 当前时间:" + LocalDateTime.now());
                            }
                        });
                    }
                    try {
                        TimeUnit.MILLISECONDS.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
    }
}

执行结果如下:

相比前两种实现方式,使用 Redis 可以将数据持久化到磁盘,规避了数据丢失的风险,并且支持分布式,避免了单点故障。

五、MQ 延时队列实现

以 RabbitMQ 为例,它本身并没有直接支持延时队列的功能,但是通过一些特性,我们可以达到实现延时队列的效果。

RabbitMQ 可以为 Queue 设置 TTL,,到了过期时间没有被消费的消息将变为死信——Dead Letter。我们还可以为Queue 设置私信转发 x-dead-letter-exchange,过期的消息可以被路由到另一个 Exchange。下图说明了这个流程,生产者通过不同的 RoutingKey 发送不同过期时间的消息,多个队列分别消费并产生死信后被路由到 exe-dead-exchange,再有一些队列绑定到这个 exchange,从而进行不同业务逻辑的消费。

在 RabbitMQ 界面操作如下:

1、在 g_normal_exchange 发送测试消息

2. 队列 g_queue_10s 绑定到 g_normal_exchange,并设置 x-message-ttl 为 10s 过期,x-dead-letter-exchange 为 g_exe_dead_exchange,可以看到消息到达后,过了 10s 之后消息被路由到g_exe_dead_exchange

3. 绑定到 g_exe_dead_exchange 的队列 g_exe_10s_queue 消费到了这条消息

使用 MQ 实现的方式,支持分布式,并且消息支持持久化,在业内应用比较多,它的缺点是每种间隔时间的场景需要分别建立队列。

六、总结

通过上面不同实现方式的比较,可以很明显的看出各个方案的优缺点,在分布式系统中我们会优先考虑使用 Redis 和 MQ 的实现方式。

在需求开发中实现一个功能的方式多种多样,需要我们进行多维度的比较,才能选择出合理的、可靠的、高效的并且适合自己业务的解决方案。

感谢你看到这里,我是程序员麦冬 ,一个java开发从业者,深耕行业六年了,每天都会分享java相关技术文章或行业资讯

欢迎大家关注和转发文章,后期还有福利赠送!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352