延迟性任务实现解析

背景

很多人在面试的时候可能都碰到过这样的一个面试题:设计一个秒杀系统,30分钟没付款就自动关闭交易,这里我们主要来看下在实际的项目中如何结合业务需求来实现类似"xxx分钟后自动完成xxx"这种属于延迟任务的功能。

业务场景

下面来看看具体的业务场景,我们在系统中有很多的需求如活动报名,活动签到,活动取消,活动审核等等都需要发送短信或者消息,其中有些比较特殊的如活动开始或者结束前xxx小时发送消息、活动到期后自动发送通知等等都属于延迟性触发的任务,那么针对于类似这样的任务,一般我们都是怎么处理的呢?

设计思路

由于系统是目前仍属于单机应用,所以在实现上暂时不考虑分布式,为了简单快速采用了JDK自带的本地延迟队列DelayQueue结合redis作为数据灾备的方案.DelayQueue是JUC框架中提供的一个具备延迟机制的队列.
DelayQueue具有有如下特点:

  • 队列中存储的元素必须实现Delayed接口,且元素具有时效性.
  • Delayed接口提供了getDelay方法来返回对象的延迟时间.
  • Delayed接口提供了compareTo方法用于队列内部元素的比较排序.
  • 内部使用了优先级队列PriorityQueue来实现每次从队首中取出来的都是最先要过期的元素.
  • 实现了BlockingQueue接口,是一个无界阻塞队列,且元素不允许为null.
  • 提供了如阻塞方法take()返回队首元素,put()方法添加元素,remove()方法元素出队等等...

那么大致实现的思路如下:
在创建活动时会将活动id与计算好的时间差值存储到redis缓存中,服务器后台开启守护线程实时监控本地队列中到期的任务,并触发相应的推送操作,同时为了防止服务器意外重启等情况,在系统初始化时会将缓存数据load到本地队列中,这样可以避免由于数据丢失导致消息与短信数据没有推送到,下面来看看实现步骤:
1.先创建任务对象dto,内部定义了任务标识ID与延迟时间戳并实现Delayed接口和Runnable接口,我们来看看其中一个dto的实现,其他的类似,代码如下:

public class NotifyDto implements Delayed, Runnable, Serializable {
    private static final long serialVersionUID = 1L;
    private final static Logger logger = LoggerFactor.getLogger(NotifyDto.class);

    private String redisMsg;//缓存数据即任务标识ID
    private long expireTime;//延迟时间

        //带参构造函数
    public NotifyDto(String redisMsg, long delayTime) {
        this.redisMsg = redisMsg;
        this.expireTime = TimeUnit.NANOSECONDS.convert(delayTime,
                TimeUnit.MILLISECONDS) + System.nanoTime();
    }
        
    public String getRedisMsg() {
        return redisMsg;
    }

    public void setRedisMsg(String redisMsg) {
        this.redisMsg = redisMsg;
    }

    public long getExpireTime() {
        return expireTime;
    }

    public void setExpireTime(long expireTime) {
        this.expireTime = expireTime;
    }
    
    /**
     * 用于延迟队列内部比较排序   当前对象的延迟时间 - 比较对象的延迟时间
     * @see java.lang.Comparable#compareTo(java.lang.Object)
     **/
    @Override
    public int compareTo(Delayed o) {
        NotifyDto task = (NotifyDto) o;
        long result = this.getDelay(TimeUnit.NANOSECONDS)
                - task.getDelay(TimeUnit.NANOSECONDS);
        if (result < 0) {
            return -1;
        } else if (result > 0) {
            return 1;
        } else {
            return 0;
        }
    }

    /**
     * 返回对象延迟时间
     * @see java.util.concurrent.Delayed#getDelay(java.util.concurrent.TimeUnit)
     **/
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expireTime - System.nanoTime(),
                TimeUnit.NANOSECONDS);
    }

    /**
     * 任务回调处理
     * @see java.lang.Runnable#run()
     **/
    @Override
    public void run() {
        logger.debug("当前任务队列:msgQueue,延迟时间:{},活动ID:{}", this.expireTime
                + "=========================================", this.redisMsg);
        this.msgPus h(this.redisMsg);
    }

    private void msgPush(String redisMsg) {
        Map<String, Object> msgMap = new HashMap<>();
        Integer activeId = NumberHelpUtils.toInt(redisMsg);
        //省略发送消息动作
                //清空缓存记录
        JedisUtils.zRemove("active:review:notify", activeId + "");
    }

    @Override
    public String toString() {
        return "NotifyDto [redisMsg=" + redisMsg + ", expireTime=" + expireTime
                + "]";
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result
                + ((redisMsg == null) ? 0 : redisMsg.hashCode());
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        NotifyDto other = (NotifyDto) obj;
        if (redisMsg == null) {
            if (other.redisMsg != null)
                return false;
        } else if (!redisMsg.equals(other.redisMsg))
            return false;
        return true;
    }
}

2.创建一个任务调度服务service监控所有队列中的过期任务对象,其内部包含各种任务队列初始化,出队与入队等方法,大致代码如下.

public class MsgQueueService {

    private final static Logger logger = LoggerFactory
            .getLogger(MsgQueueService.class);

    private MsgQueueService() {

    }

    private static class MsgHolder {
        private static MsgQueueService msgQueueService = new MsgQueueService();
    }

    public static MsgQueueService getInstance() {
        return MsgHolder.msgQueueService;
    }

    /**
     * 执行任务线程池
     */
    private final static Executor es = Executors.newFixedThreadPool(5);

    /**
     * 创建3个守护线程
     */
    private Thread expireActiveThread;

    private Thread startByActiveThread;

    private Thread beforeEndActiveThread;

    /**
     * 创建延迟任务队列
     */
    private DelayQueue<NotifyDto> msgQueue = new DelayQueue<>();

    private DelayQueue<StartByNotifyDto> msgQueue2 = new DelayQueue<>();

    private DelayQueue<BeforEndNotifyDto> msgQueue3 = new DelayQueue<>();

    /**
     * 
     * 系统启动时初始化
     */
    public void init() {
        //初始化数据
        initRedisMsg();
        //监听活动结束后任务
        expireActiveThread = new Thread(() -> execute());
        expireActiveThread.setDaemon(true);
        expireActiveThread.setName("ExpireActive Daemon Thread");
        expireActiveThread.start();
        //监听活动开始前2小时任务
        startByActiveThread = new Thread(() -> execute2());
        startByActiveThread.setDaemon(true);
        startByActiveThread.setName("StartByActive Daemon Thread");
        startByActiveThread.start();
        //监听活动结束前2小时任务
        beforeEndActiveThread = new Thread(() -> execute3());
        beforeEndActiveThread.setDaemon(true);
        beforeEndActiveThread.setName("BeforeEndActive Daemon Thread");
        beforeEndActiveThread.start();
    }

    /**
     * 
     * 从Redis中初始化任务到队列中
     */
    public void initRedisMsg() {
        Set<String> keySet = JedisUtils.zRange("active:review:notify", 0, -1);
        if (CollectionHelpUtils.isNotEmpty(keySet)) {
            keySet.stream().forEach(
                    o -> {
                        long expireTime = JedisUtils.zScore(
                                "active:review:notify", o).longValue();
                        NotifyDto redisTask = new NotifyDto(o, expireTime
                                - System.currentTimeMillis());
                        this.push(redisTask);
                    });
        }
        Set<String> applyStartSet = JedisUtils.zRange(
                "active:applyStart:notify", 0, -1);
        if (CollectionHelpUtils.isNotEmpty(applyStartSet)) {
            applyStartSet.stream().forEach(
                    o -> {
                        long expireTime = JedisUtils.zScore(
                                "active:applyStart:notify", o).longValue();
                        StartByNotifyDto redisTask = new StartByNotifyDto(o,
                                expireTime - System.currentTimeMillis());
                        this.push(redisTask);
                    });
        }
        Set<String> applyEndSet = JedisUtils.zRange("active:applyEnd:notify",
                0, -1);
        if (CollectionHelpUtils.isNotEmpty(applyEndSet)) {
            applyEndSet.stream().forEach(
                    o -> {
                        long expireTime = JedisUtils.zScore(
                                "active:applyEnd:notify", o).longValue();
                        BeforEndNotifyDto redisTask = new BeforEndNotifyDto(o,
                                expireTime - System.currentTimeMillis());
                        this.push(redisTask);
                    });
        }
    }

    /**
     * 监听队列,如果没有过期对象则阻塞
     * @param es
     */
    private void execute() {
        while (true) {
            try {
                NotifyDto task = msgQueue.take();
                if (task != null) {
                    logger.debug("当前任务队列:{},执行时间:{}", "msgQueue",
                            DateTimeUtils.getTime());
                                        //此处真正执行了任务对象中的run方法,触发了业务推送动作
                    es.execute(task);
                }
            } catch (Exception e) {
                e.printStackTrace();
                break;
            }
        }
    }

    private void execute2() {
        while (true) {
            try {
                StartByNotifyDto task2 = msgQueue2.take();
                if (task2 != null) {
                    logger.debug("当前任务队列:{},执行时间:{}", "msgQueue2",
                            DateTimeUtils.getTime());
                                        //此处真正执行了任务对象中的run方法,触发了业务推送动作
                    es.execute(task2);
                }
            } catch (Exception e) {
                e.printStackTrace();
                break;
            }
        }
    }

    private void execute3() {
        while (true) {
            try {
                BeforEndNotifyDto task3 = msgQueue3.take();
                if (task3 != null) {
                    logger.debug("当前任务队列:{},执行时间:{}", "msgQueue3",
                            DateTimeUtils.getTime());
                                        //此处真正执行了任务对象中的run方法,触发了业务推送动作
                    es.execute(task3);
                }
            } catch (Exception e) {
                e.printStackTrace();
                break;
            }
        }
    }

    /**
     * 
     * 任务入队
     * @param time
     * @param task
     */
    public void push(NotifyDto task) {
        msgQueue.put(task);
    }

    public void push(StartByNotifyDto task) {
        msgQueue2.put(task);
    }

    public void push(BeforEndNotifyDto task) {
        msgQueue3.put(task);
    }

    /**
     * 
     * 任务出队
     * @param task
     */
    public void remove(NotifyDto task) {
        msgQueue.remove(task);
    }

    public void remove(StartByNotifyDto task) {
        msgQueue2.remove(task);
    }

    public void remove(BeforEndNotifyDto task) {
        msgQueue3.remove(task);
    }
}

注意: MsgQueueService的实现使用了单例模式,并且其中的init方法在系统初始化时被调度执行.
3.创建一个系统初始化服务类,默认实现Spring框架提供的接口InitializingBean,重写其接口方法afterPropertiesSet以便在系统启动后自动执行初始化逻辑.

    @Override
    public void afterPropertiesSet() throws Exception {
        logger.debug("系统初始化...");
        MsgQueueService.getInstance().init();
  }

4.最后一步就是具体的业务逻辑处理了,我们在业务service中的调用方法代码如下:

    /**
     * 
     * 把活动任务推送到延迟队列中
     * @param activeId
     */
    private void Msg2Queue(Active active, boolean isAdd) {
        long delayTime = 0;
        if (isAdd) {
            //计算出当前任务延迟时间
            delayTime = active.getActiveEndTime().getTime()
                    - active.getCreateTime().getTime();
            //推送到redis中
            JedisUtils.zAdd("active:review:notify", active.getActiveEndTime()
                    .getTime(), active.getId() + "");
            //活动到期未点评的任务入队
            NotifyDto task = new NotifyDto(active.getId() + "", delayTime);
            MsgQueueService.getInstance().push(task);
            //判断活动报名截止状态
            if (active.getApplyAudit() == 1) {
                if (active.getApplyAbort() == 1) {
                    //活动开始截止报名
                    delayTime = active.getActiveStartTime().getTime() - 2 * 60
                            * 60 * 1000 - System.currentTimeMillis();
                    //推送到redis中
                    JedisUtils.zAdd("active:applyStart:notify", active
                            .getActiveStartTime().getTime()
                            - 2
                            * 60
                            * 60
                            * 1000, active.getId() + "");
                    StartByNotifyDto task2 = new StartByNotifyDto(
                            active.getId() + "", delayTime);
                    MsgQueueService.getInstance().push(task2);
                } else {
                    //活动结束前可报名
                    delayTime = active.getActiveEndTime().getTime() - 2 * 60
                            * 60 * 1000 - System.currentTimeMillis();
                    //推送到redis中
                    JedisUtils.zAdd("active:applyEnd:notify", active
                            .getActiveEndTime().getTime() - 2 * 60 * 60 * 1000,
                            active.getId() + "");
                    BeforEndNotifyDto task3 = new BeforEndNotifyDto(
                            active.getId() + "", delayTime);
                    MsgQueueService.getInstance().push(task3);
                }
            }
        } else {
            //动态取消任务
            Double score = JedisUtils.zScore("active:review:notify",
                    active.getId() + "");
            if (score != null) {
                long expireTime = score.longValue();
                MsgQueueService.getInstance().remove(
                        new NotifyDto(active.getId() + "", expireTime));
                //清空redis中记录
                JedisUtils.zRemove("active:review:notify", active.getId() + "");
                //重新计算出当前任务延迟时间
                delayTime = active.getActiveEndTime().getTime()
                        - System.currentTimeMillis();
                //推送到redis中
                JedisUtils.zAdd("active:review:notify", active
                        .getActiveEndTime().getTime(), active.getId() + "");
                //延迟任务入队
                NotifyDto task = new NotifyDto(active.getId() + "", delayTime);
                MsgQueueService.getInstance().push(task);
            }
        }
    }

注意:由于实际业务中任务可以被修改或取消,所以定义任务dto时需重写其hashcode方法与equals方法,来防止队列中的对象出现冲突,由于dto中的redisMsg字段对应了mysql中的表自增主键,所以我们使用了这个字段来重写这两个方法.

总结

此方案的优点:

  • 代码实现相对比较简单,利用JDK自带的容器类来解决延迟处理问题,无需自己造轮子.
  • 效率高,任务触发时间延迟低.
  • 由于线程安全还可以实现多生产者与消费者.
  • 结合redis做数据灾备,避免由于服务重启或其他异常退出导致的数据丢失问题.

此方案的缺点:

  • DelayQueue属于单机队列,若在分布式集群环境下,要自己做横向扩展以实现高可用,难度较高.
  • 服务器一旦宕机,数据将丢失,需结合其他底层存储做持久化,增加了编码的复杂性.
  • 数据存储在单机内存中,受物理条件限制,数据量大时容易OOM.

其他的解决方案:

  • JDK自带的线程池ScheduledExecutorService.
  • 简单的定时任务轮询,扫描数据表,数据量大时会有性能瓶颈.
  • 可以定时任务结合redis,任务和到期时间都保存在redis中,启动定时任务扫描redis,到期的key删除,并且异步更新数据库.
  • google guava的缓存也可实现类似的功能.
  • 消息队列如ActiveMQ或者RobbitMQ都提供了死信队列可实现延迟功能,设定任务的到期时间,到期之后自动进入死信队列,后台开启守护线程监控死信队列.
  • 时间轮算法,Netty提供了一个HashedWheelTimer来实现.
  • 利用redis的zset可实现延迟队列或者redis的Keyspace Notifications(键空间机制)实现key失效后提供回调函数.
    ......
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,591评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,448评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,823评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,204评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,228评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,190评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,078评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,923评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,334评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,550评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,727评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,428评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,022评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,672评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,826评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,734评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,619评论 2 354

推荐阅读更多精彩内容