DelayQueue之通用组件

我们产品的的业务中有那么一个场景,在医生关闭问诊的3min后,患者将无法继续和医生进行对话。我根据对业务的理解,和对技术实现成本的衡量,决定采用通过DelayQueue的方式来实现的方案。

关于DelayQueue的相关内容介绍和核心源码解析已在上一篇DelayQueue之源码分析说明了。

据我所知,生活中有如下场景可以用得到DelayQueue:
1、下单后一段时间(业内基本上都是30分钟)内不付款,就自动取消订单。
2、提交打车申请后,一段时间内(比如说30秒)没有附近的司机接单,就自动发送发送给更多的司机。

这类场景都有如下特点:
1、需要有一段时间的延迟,如果仅仅是为了异步执行,那么消息队列显然是是更优的选择。
2、对执行时间的精确度有一定要求,当然异常状况下,也可以对精确度适当放宽松。比如场景1的订单取消,规则设置为30分钟不支付就取消,但实际场景中,精确到30分自然是最好结果,但假如出现故障,那么在可允许的范围内将订单取消也是可以接受的(比如说在放宽到32分钟内)。
3、执行是高频率的。这点需要和第2点结合起来看,如果仅仅是为了低频率的定时执行,个人认为任务调度也是可行的。

综合来看,如果不需要延迟执行,那么推荐用消息队列;如果对执行时间的精确度不那么在意且执行频率不高,那么推荐使用任务调度;如果需要延迟执行,且执行比较高频,对执行时间的精确度有一定要求,可以考虑使用延迟队列。
以上这些是我们为何采用DelayQueue来实现这个业务场景的原因。

为了方便使用DelayQueue,我封装了组件对DelayQueue进行了扩展。

首先我定义了一个类TaskMessage,对Delayed进行了扩展,实现了compareTo和getDelay方法。
如下是TaskMessage类的核心代码。

public class TaskMessage implements Delayed {

    private String body;  //消息内容
    private long executeTime;//执行时间
    private Function function;//执行方式
    
    public TaskMessage(Long delayTime,String body,  Function function) {
        this.body = body;
        this.function = function;
        this.executeTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime();
    }

    @Override
    public int compareTo(Delayed delayed) {
        TaskMessage msg = (TaskMessage) delayed;
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) -msg.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.executeTime - System.nanoTime(), TimeUnit.NANOSECONDS);
    }
}

外部调用只需要TaskMessage m1 = new TaskMessage(delayTime, body, function)就可以生成一个延迟任务的元素了,内部自动就根据延迟时间计算出这个延迟任务元素的预期执行时间。
Function是1.8版引入的函数式接口,主要方法是R apply(T t),功能是将Function对象应用到输入的参数上,然后返回计算结果。
那么达到延迟任务的预期执行时间时,只需要调用一下function.apply()方法就可以了,不需要关心apply的具体实现。apply的具体实现方法是在调用时才明确的。

然后定义一个延迟任务的执行线程类TaskConsumer,实现了Runnable,重写了run方法。因为延迟任务的执行,必然是需要重新起线程去执行的,不能阻碍主线程的操作。
如下是TaskConsumer类的核心代码。

public class TaskConsumer implements Runnable {
        
    @Override
    public void run() {
        while (signal) {
            try {
                TaskMessage take = queue.take();
                if (logger.isInfoEnabled()) {
                    logger.info("处理线程的id为" + threadId + ",消费消息内容为:" + take.getBody() + ",预计执行时间为" +
                            DateFormatUtils.timeStampToString(take.getDelay(TimeUnit.MILLISECONDS) + System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss"));
                }
                take.getFunction().apply(take.getBody());
            } catch (InterruptedException e) {
                if (logger.isInfoEnabled()) {
                    logger.info("id为" + threadId + "的处理线程被强制中断");
                }
            } catch (Exception e) {
                logger.error("taskConsumer error", e);
            }
        }
        if (logger.isInfoEnabled()) {
            logger.info("id为" + threadId + "的处理线程已停止");
        }
    }
}

这个类核心代码就只有如下两行。
TaskMessage take = queue.take(); 获取延迟队列的队首元素。前文已经解释过,Queue的take方法会返回队列的队首元素,否则就会挂起线程。所以只要有返回值,必然就能获取到当前需要执行的TaskMessage元素。
take.getFunction().apply(take.getBody()); 执行延迟任务元素的apply方法。applay方法是在定义TaskMessage的时候确定的,表明了到达预期执行时间所需要进行的一系列操作,那么此时只需要执行对应的apply方法就可以了。

最后是加载TaskConsumer的统一管理类TaskManager。
如下是TaskManager类的核心代码。

public class TaskManager implements ApplicationContextAware,
        InitializingBean,DisposableBean{

   
    @Override
    public void afterPropertiesSet() throws Exception {
        for (int i = 0; i < threadCount; i++) {
            TaskConsumer taskConsumer = new TaskConsumer(queue, i);
            taskConsumerList.add(taskConsumer);
            Thread thread = new Thread(taskConsumer);
            threadList.add(thread);
            thread.start();
        }
    }

    @Override
    public void destroy() throws Exception {
        for(int i=0;i<threadList.size();i++){
            threadList.get(i).interrupt();
            taskConsumerList.get(i).setSignal(Boolean.FALSE);
        }
    }
}

这个类的作用在于初始化类后,就启动线程不断的去获取延迟任务。然后在销毁类后,先中断消费者线程,然后设置信号量使得消费者线程的run方法能跳出死循环,从而使得消费线程正常结束。

最后是如何调用的示例。很简单,就只有两步:
1、生成延迟任务元素taskMessage
2、将taskMessage添加到延迟队列中

TaskMessage taskMessage = new TaskMessage(delayTime * 1000, messageBody,
        function -> this.processTask(delayTaskMessage));
DelayQueue<TaskMessage> queue = taskManager.getQueue();
queue.offer(taskMessage);

ok,以上是如何扩展DelayQueue的功能构造成高可用的组件的方案,欢迎大家来一起讨论。

下一章我准备讲一下我们项目中运用DelayQueue的过程中碰到的问题以及如何持久化的方案。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容