DelayQueue 源码解析

如果不想在世界上虚度一生,那就要学习一辈子。

1 整体设计

DelayQueue 延迟队列底层使用的是锁的能力,比如说要在当前时间往后延迟5秒执行,那么当前线程就会沉睡5秒,等5秒后线程被唤醒时,如果能获取到资源的话,线程即可立马执行。原理上似乎很简单,但内部实现却很复杂,有很多难点,比如运行资源不够,多个线程同时被唤醒时,如何排队等待?比如说在何时阻塞?何时开始执行等等?接下来我们从源码角度来看下是如何实现。

1.1 类注释

类注释上比较简单,只说了三个概念:

  1. 队列中元素将在过期时被执行,越靠近队头,越早过期;
  2. 未过期的元素不能够被take;
  3. 不允许空元素。

这三个概念,其实就是三个问题,下文我们会一一看下这三点是如何实现的。

1.2 类图

DelayQueue的类图和之前的队列一样,不多说,关键是DelayQueue类上是有泛型的,如下:

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

从泛型中可以看出,DelayQueue中的元素必须是Delayed的子类,Delayed是表达延迟能力的关键接口,其继承了Comparable接口,并定义了还剩多久过期的方法,如下:

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

也就是说DelayQueue队列中的元素必须是实现Delayed接口和Comparable接口的,并覆写了getDelay方法和conpareTo的方法才行,不然在编译时,编译器就会提醒我们元素必须强制实现Delayed接口。

除此之外 DelayQueue 还大量使用了PriorityQueue队列的大量功能,这个和 SynchronousQueue 队列很想,大量复用了其他基础类的逻辑,代码示例如下:



PriorityQueue 中文叫做优先级队列,在此处的作用就是可以根据过期时间做优先级排序,让先过期的可以先执行,用来实现类注释中的第一点。

这里的复用的思想还是蛮重要的,我们在源码中经常遇到这种思想,比如说LinkedHashMap复用HashMap的能力,Set复用Map的能力,还有此处的DelayQueue复用PriorityQueue的能力。小结一下,如果想要复用需要做到哪些:

  1. 需要把能遇见可复用的功能尽量抽象,并开放出可扩展的地方,比如说HashMap在操作数组的方法中,都给LinkedHashMap开放出很多after开头的方法,便于LinkedHashMap进行排序、删除等等;
  2. 采用组合或继承两种手段进行复用,比如LinkedHashMap采用的继承、Set和DelayQueue采用的组合,组合的意思就是把可复用的类给依赖进来。

2 演示

为了方便大家理解,写了一个演示的demo,演示了一下:

public class DelayQueueDemo {
    // 队列消息的生产者
    static class Product implements Runnable {
        private final BlockingQueue queue;

        public Product(BlockingQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                System.out.println("begin put");
                long beginTime = System.currentTimeMillis();
                queue.put(new DelayedDTO(System.currentTimeMillis() + 2000L, beginTime));//延迟 2 秒执行
                queue.put(new DelayedDTO(System.currentTimeMillis() + 5000L, beginTime));//延迟 5 秒执行
                queue.put(new DelayedDTO(System.currentTimeMillis() + 1000L * 10, beginTime));//延迟 10 秒执行
                System.out.println("end put");
            } catch (InterruptedException e) {
                System.out.println("" + e);
            }
        }
    }

    // 队列的消费者
    static class Consumer implements Runnable {
        private final BlockingQueue queue;

        public Consumer(BlockingQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                System.out.println("Consumer begin");
                ((DelayedDTO) queue.take()).run();
                ((DelayedDTO) queue.take()).run();
                ((DelayedDTO) queue.take()).run();
                System.out.println("Consumer end");
            } catch (InterruptedException e) {
                System.out.println("" + e);
            }
        }
    }

    // 队列元素,实现了 Delayed 接口
    static class DelayedDTO implements Delayed {
        Long s;
        Long beginTime;

        public DelayedDTO(Long s, Long beginTime) {
            this.s = s;
            this.beginTime = beginTime;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(s - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
        }

        public void run() {
            System.out.printf("现在已经过了{%d}秒钟", (System.currentTimeMillis() - beginTime) / 1000);
            System.out.println();
        }
    }

    // demo 运行入口
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue q = new DelayQueue();
        DelayQueueDemo.Product p = new DelayQueueDemo.Product(q);
        DelayQueueDemo.Consumer c = new DelayQueueDemo.Consumer(q);
        new Thread(c).start();
        new Thread(p).start();
    }
}

打印出来的结果如下:
begin put
end put
Consumer begin
现在已经过了{2}秒钟
现在已经过了{5}秒钟
现在已经过了{10}秒钟
Consumer end

写这个代码的目的主要想演示一下延迟执行的例子,我们大概的思路是:

  1. 新建队列的元素,如DelayedDto,必须实现Delayed接口,我们在getDelay方法中实现了现在离过期时间还剩多久的方法。
  2. 定义队列元素的生产者和消费者,对应代码中的Product和Consumer。
  3. 对生产者和消费者就进行初始化和管理,对应我们的main方法。

虽然这只是一个简单的demo,但实际工作中,我们使用DelayQueue基本上就是这种思想,只不过写代码的时候会更加通用和周全,接下来我们来看下DelayQueue是如何实现put和take的。

3 放数据

我们以put为例,put调用的是offer的方法,offer的源码如下:

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    // 上锁
    lock.lock();
    try {
        // 使用 PriorityQueue 的扩容,排序等能力
        q.offer(e);
        // 如果恰好刚放进去的元素正好在队列头
        // 立马唤醒 take 的阻塞线程,执行 take 操作
        // 如果元素需要延迟执行的话,可以使其更快的沉睡计时
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        // 释放锁
        lock.unlock();
    }
}

可以看到其实底层使用到的是 PriorityQueue 的 offer 方法,我们来看下:

// 新增元素
public boolean offer(E e) {
    // 如果是空元素的话,抛异常
    if (e == null)
        throw new NullPointerException();
    modCount++;
    int i = size;
    // 队列实际大小大于容量时,进行扩容
    // 扩容策略是:如果老容量小于 64,2 倍扩容,如果大于 64,50 % 扩容
    if (i >= queue.length)
        grow(i + 1);
    size = i + 1;
    // 如果队列为空,当前元素正好处于队头
    if (i == 0)
        queue[0] = e;
    else
    // 如果队列不为空,需要根据优先级进行排序
        siftUp(i, e);
    return true;
}
// 按照从小到大的顺序排列
    private void siftUpComparable(int k, E x) {
        Comparable<? super E> key = (Comparable<? super E>) x;
        // k 是当前队列实际大小的位置
        while (k > 0) {
            // 对 k 进行减倍
            int parent = (k - 1) >>> 1;
            Object e = queue[parent];
            // 如果 x 比 e 大,退出,把 x 放在 k 位置上
            if (key.compareTo((E) e) >= 0)
                break;
            // x 比 e 小,继续循环,直到找到 x 比队列中元素大的位置
            queue[k] = e;
            k = parent;
        }
        queue[k] = key;
    }

可以看到,PriorityQueue的offer方法主要做了散件事情:

  1. 对新增元素进行判空;
  2. 对队列进行扩容,扩容策略和集合库容策略很相近;
  3. 根据元素的compareTo方法进行排序,我们希望最终排序的结果是从小到大的,因为我们想让队头的都是过期的数据,我们需要在compareTo方法里面实现: 通过每个元素的过期时间进行排序:
(int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));

这样便可实现越快过期的元素越能排到队头。
可以看到,新增数据时,只是使用到了compareTo方法,来对队列中的元素进行排序,接下来我们看下,取数据时,是如何操作的。

4 拿数据

取数据时,我们发现有元素的过期时间到了,就能拿出数据来,如果没有过期时间,那么线程就会一致阻塞,我们以take为例子,来看一下核心源码:

for (;;) {
    // 从队头中拿数据出来
    E first = q.peek();
    // 如果为空,说明队列中,没有数据,阻塞住
    if (first == null)
        available.await();
    else {
        // 获取队头数据的过期时间
        long delay = first.getDelay(NANOSECONDS);
        // 如果过期了,直接返回队头数据
        if (delay <= 0)
            return q.poll();
        // 引用置为 null ,便于 gc,这样可以让线程等待时,回收 first 变量
        first = null;
        // leader 不为空的话,表示当前队列元素之前已经被设置过阻塞时间了
        // 直接阻塞当前线程等待。
        if (leader != null)
            available.await();
        else {
          // 之前没有设置过阻塞时间,按照一定的时间进行阻塞
            Thread thisThread = Thread.currentThread();
            leader = thisThread;
            try {
                // 进行阻塞
                available.awaitNanos(delay);
            } finally {
                if (leader == thisThread)
                    leader = null;
            }
        }
    }
}

可以看到阻塞等待的功能底层使用的是锁的能力。

以上演示的 take 方法是会无限阻塞,直到队头的过期时间到了才会返回,如果不想无限阻塞,可以尝试 poll 方法,设置超时时间,在超时时间内,队头元素还没有过期的话,就会返回 null。

5 总结

DelayQueue 是非常有意思的队列,底层使用了排序和超时阻塞实现了延迟队列,排序使用的是 PriorityQueue 排序能力,超时阻塞使用得是锁的等待能力,可以看出 DelayQueue 其实就是为了满足延迟执行的场景,在已有 API 的基础上进行了封装,我们在工作中,可以学习这种思想,对已有的功能能复用的尽量复用,减少开发的工作量。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,496评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,407评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,632评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,180评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,198评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,165评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,052评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,910评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,324评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,542评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,711评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,424评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,017评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,668评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,823评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,722评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,611评论 2 353