J.U.C 阻塞队列源码剖析系列(终结篇)之 DelayQueue

上一篇文章剖析了 PriorityBlockingQueue 的相关源码,那这篇文章接着看另外一个常见的阻塞队列 —— DelayQueue

简介

DelayQueue是一个延迟处理的队列。其内部是复用PriorityQueue类具有对过期的对象优先处理的能力;因为PriorityQueue对象的数据结构是二叉堆,所以我们也可以认为 DelayQueue 对象的数据结构也是二叉堆。值得注意的是,千万不要以为整个二叉堆都是排序的,实际上除了堆头元素之外其余的元素都是无序的。不清楚PriorityQueue源码的朋友,可参考我另外一篇拙作Java 队列之 PriorityQueue 源码分析

依据我的习惯,学习源码之前先写个demo,了解大概的用法先!
示例:

public class DelayQueueDemo {
    public static void main(String[] args) {
        DelayQueue<DelayDemo> delayQueue = new DelayQueue();
        long beginTime = System.currentTimeMillis();
        new Thread(() -> {
            // 2秒后过期
            delayQueue.add(new DelayDemo(beginTime + 2000L, beginTime));
        }, "线程一").start();
        new Thread(() -> {
            try {
                delayQueue.take().show();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "线程二").start();
        new Thread(() -> {
            try {
                Thread.sleep(1000);
                delayQueue.take().show();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "线程三").start();
    }
}

class DelayDemo implements Delayed {

    private Long beginTime;

    private Long now;

    public DelayDemo() {
    }

    public DelayDemo(Long now, Long beginTime) {
        this.now = now;
        this.beginTime = beginTime;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(now - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }

    public void show(){
        System.out.println(Thread.currentThread().getName() + " 执行");
    }
}
线程二 执行

源码剖析

类注释

  • 无界限的队列
  • 队列的元素必须是 Delayed 的子类
  • 当队列元素过期才能被 take 方法调用
  • 越靠近队列头部的元素越快过期
  • 队列不允许有空元素
  • 没有过期的元素或者队列为空,使用 poll 方法会返回null
  • 实现了 Collection 和 Iterator 接口,但无法使用 iterator 方法迭代队列的元素

成员变量

    // 可重入锁,用于线程安全
    private final transient ReentrantLock lock = new ReentrantLock();
    // 优先队列,用于存储元素
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    private Thread leader = null;

    // 用于实现阻塞的Condition对象
    private final Condition available = lock.newCondition();

构造方法

    public DelayQueue() {}

    // 传入集合对象初始化DelayQueue对象。传入的集合对象必须是 Delayed 的子类
    public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }

添加元素

    // 添加元素
    public boolean add(E e) {
        return offer(e);
    }

    // 添加元素
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            // 查看元素是否为队首
            if (q.peek() == e) {
                // 设置leader为空
                leader = null;
                // 唤醒所有等待的队列
                available.signal();
            }
            return true;
        } finally {
            // 释放锁
            lock.unlock();
        }
    }

    // 添加元素
    public void put(E e) {
        offer(e);
    }

    // 添加元素
    public boolean offer(E e, long timeout, TimeUnit unit) {
        return offer(e);
    }

可以发现,添加元素的核心方法是offer(E e)。在添加元素之前加锁保证线程安全,然后调用 PriorityQueue 对象添加元素,由于 DelayQueue 对象没有限定队列容量,所以可认为无限大,添加元素的时候不会检查容量大小,直到发生OOM。

查看元素

    public E poll() {
        final ReentrantLock lock = this.lock;
        // 加锁
        lock.lock();
        try {
            E first = q.peek();
            // first == null:队列为空,返回null
            // first.getDelay(NANOSECONDS) > 0:说明第一个元素还没到超时时间,返回null
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            else
                return q.poll();// 返回第一个元素的值
        } finally {
            // 释放锁
            lock.unlock();
        }
    }

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        // 加锁
        lock.lockInterruptibly();
        try {
            for (;;) {
                // 第一个元素
                E first = q.peek();
                // 如果队列为空
                if (first == null) {
                    // 如果达到方法超时时间,返回null
                    if (nanos <= 0)
                        return null;
                    else
                        // 阻塞等待
                        nanos = available.awaitNanos(nanos);
                } else {
                    long delay = first.getDelay(NANOSECONDS);
                    // 队列第一个元素已过期,直接出队
                    if (delay <= 0)
                        return q.poll();
                    // 执行poll方法已超时,返回null
                    if (nanos <= 0)
                        return null;
                    // 等待的时候,释放first的引用,避免内存泄漏
                    first = null;
                    // nanos < delay:poll方法超时时间小于队列第一个元素过期时间,则阻塞等待
                    // leader != null:队列第一个元素被其它线程占有,则阻塞等待
                    if (nanos < delay || leader != null)
                        nanos = available.awaitNanos(nanos);
                    else {// 执行到这里说明队列第一个元素没有被其它线程占有,而且当队列第一个元素过期后,poll方法还没超时,则往下执行计算poll方法剩余时间并重新自旋后出队
                        Thread thisThread = Thread.currentThread();
                        // 设置第一个元素为当前线程占有
                        leader = thisThread;
                        try {
                            // 计算队列第一个元素过期后,poll方法的超时时间余下的时间
                            long timeLeft = available.awaitNanos(delay);
                            nanos -= delay - timeLeft;
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // 队列第一个元素没有被线程占有而且队列不为空,唤醒所有等待线程
            if (leader == null && q.peek() != null)
                available.signal();
            // 释放锁
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                // 获取队首元素
                E first = q.peek();
                // 队首为空,则阻塞当前线程
                if (first == null)
                    available.await();
                else {
                    // 获取队首元素的过期时间
                    long delay = first.getDelay(NANOSECONDS);
                    // 已过期,直接出队
                    if (delay <= 0)
                        return q.poll();
                    // 等待的时候,释放first的引用,避免内存泄漏
                    first = null;
                    // leader != null表明有其他线程在操作,阻塞当前线程
                    if (leader != null)
                        available.await();
                    else {
                        // leader指向当前线程
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 等待阻塞,直到first元素过期
                            available.awaitNanos(delay);
                        } finally {
                            // 释放leader
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // leader为null并且队列不为空,说明没有其他线程在等待,那就通知条件队列
            if (leader == null && q.peek() != null)
                available.signal();
            // 释放锁
            lock.unlock();
        }
    }
    
    // 无论队列第一个元素是否超时都会被读取,而且不会删除队列元素
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.peek();
        } finally {
            lock.unlock();
        }
    }

查看队列元素的方法有三个:peek、poll 和 take 方法。

  1. peek方法最简单,直接调用 PriorityQueue.peek 方法获取队列第一个元素,但不删除;另外两个方法,只要读取到元素就会删除队列第一个元素。
  2. poll方法的参数为空,如果队列第一个元素为空或是否过期就会返回null,否则就会读取并删去元素。如果poll方法参数指定了等待时间,则会在读取元素之前根据方法等待时间和元素本身的过期时间,只有队列不为空,第一个元素被当前线程占有后,元素过期且方法指定的等待时间未超时则会读取并删除第一个元素,否则会一直阻塞直到方法等待时间超时。
  3. 至于take方法的流程与poll方法加上等待时间的流程差不多,如果线程加锁成功后队列第一个元素first已过期则直接读取并删除first,否则会判断是否已有线程占有first元素,如果有其它线程占有,则当前线程执行await操作,否则设置当前线程占有first元素并阻塞等待直到first元素过期,线程一旦达到过期时间就会自旋并读取删除first元素。

take方法源码为例,画出以下流程以帮助理解源码:

删除元素

    // 无论是否过期都会删除指定的元素
    public boolean remove(Object o) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.remove(o);
        } finally {
            lock.unlock();
        }
    }

常用方法

    // 返回队列元素数量
    public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.size();
        } finally {
            lock.unlock();
        }
    }
    
    // 清空队列所有的元素
    public void clear() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.clear();
        } finally {
            lock.unlock();
        }
    }

    // 返回队列的剩余容量,总是会返回Integer的最大值
    public int remainingCapacity() {
        return Integer.MAX_VALUE;
    }

    // 将队列转为Object数组返回
    public Object[] toArray() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.toArray();
        } finally {
            lock.unlock();
        }
    }

    // 返回指定类型的数组
    public <T> T[] toArray(T[] a) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.toArray(a);
        } finally {
            lock.unlock();
        }
    }

总结

其实不难看出,DelayQueue 类的实现其实就是通过ReentrantLock锁保证线程安全,要求队列元素实现Delayed接口和复用PriorityQueue类达到延迟和排序的能力!给我最大的收获是,要学会复用的思想,比方说我们可以复用已有的 API 实现一些自研的程序或二次开发,这种思想将会提高开发效率和降低开发的工作量。但是大家有没有想过,DelayQueue 有个缺点,如果数据放在延迟队列中,数据还没过期,忽然宕机了怎么办?因为数据是放在内存中,如果没有持久化的话,宕机后就会丢数据!所以说,使用 DelayQueue 设置过期数据不宜时间太久,会有丢数据的风险!常用的消息队列中间件,如:RocketMQ、Kafka等,其实会考虑到宕机丢数据的情况,所以都会有刷盘机制降低丢数据的风险。

参考资料:

Java 队列之 PriorityQueue 源码分析

慕课网:面试官系统精讲Java源码及大厂真题:https://www.imooc.com/read/47/article/863

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,928评论 6 509
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,748评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,282评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,065评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,101评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,855评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,521评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,414评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,931评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,053评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,191评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,873评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,529评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,074评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,188评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,491评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,173评论 2 357

推荐阅读更多精彩内容