DelayQueue源码学习总结

为了加深印象,边看源码边写总结吧,我写的不好,如果你万一搜到这篇,可以马上关闭了。去看其他的文章哈。

1 DelayQueue的类信息及内部属性:

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    /**
     * Thread designated to wait for the element at the head of
     * the queue.  This variant of the Leader-Follower pattern
     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
     * minimize unnecessary timed waiting.  When a thread becomes
     * the leader, it waits only for the next delay to elapse, but
     * other threads await indefinitely.  The leader thread must
     * signal some other thread before returning from take() or
     * poll(...), unless some other thread becomes leader in the
     * interim.  Whenever the head of the queue is replaced with
     * an element with an earlier expiration time, the leader
     * field is invalidated by being reset to null, and some
     * waiting thread, but not necessarily the current leader, is
     * signalled.  So waiting threads must be prepared to acquire
     * and lose leadership while waiting.
     */
    private Thread leader = null;

    /**
     * Condition signalled when a newer element becomes available
     * at the head of the queue or a new thread may need to
     * become leader.
     */
    private final Condition available = lock.newCondition();

1.通过类信息我们发现,DelayQueue继承了AbstractQueue和实现了BlockingQueue,也就是说具备阻塞队列的所有属性。
2.类定义上家了泛型<E extends Delayed>,也就说他只接收实现Delayed接口的元素。
3.lock 锁应用的是ReentrantLock,可重入锁,它所有的入队出队等等操作用的都是用这个锁。
4.PriorityQueue<E> q:优先队列,用这个队列存储元素,优先队列就是基于最小二叉堆的实现,底层存储方式是数组,可以看看PriorityQueue的源码,注意这句,children of queue[n] are queue[2n+1] and queue[2(n+1)].意思是数组n上的左子树和右子树的位置分别是2*n+1和2*n+2,也就是这个树是通过数组实现的。你要写堆排序或者top k的的时候,可以借鉴他的源码。
想了解PriorityQueue,请参考PriorityQueue介绍,想了解堆排序,请参考堆排序
好了,用优先队列的目的我们就知道了,就是想用他的每次get出来的都是最小元素的功能白。
5.Thread leader 这个在注释里面着重介绍了是Leader-Follower模式,其实目的就是解决线程争用的问题,这个后面分析具体代码的时候再说吧。
6.Condition available:大家应该都知道,就是等待某个条件被唤醒白,ReentrantLock+Condition的用法如果不明白,建议找个源码解读,对着源码一起理解理解,这里放个经典的图:

image.png

2 代码解读

2.1 offer方法

反正不管是add、put、offer(E e, long timeout, TimeUnit unit)调用的都是offer(e)这个方法。就看这个就行了,下面贴下源代码:

/**
     * Inserts the specified element into this delay queue.
     *
     * @param e the element to add
     * @return {@code true}
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e); //调用PriorityQueue的offer方法
            if (q.peek() == e) {
                leader = null;
                available.signal(); //唤醒condition上的线程
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

--------下面是PriorityQueue的代码---------------
    /**
     * Inserts the specified element into this priority queue.
     *
     * @return {@code true} (as specified by {@link Queue#offer})
     * @throws ClassCastException if the specified element cannot be
     *         compared with elements currently in this priority queue
     *         according to the priority queue's ordering
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        modCount++;
        int i = size;
        if (i >= queue.length)
            grow(i + 1); //扩容
        size = i + 1;
        if (i == 0)
            queue[0] = e;
        else
            siftUp(i, e); //调整二叉小顶堆结构
        return true;
    }

   /**
     * Inserts item x at position k, maintaining heap invariant by
     * promoting x up the tree until it is greater than or equal to
     * its parent, or is the root.
     *
     * To simplify and speed up coercions and comparisons. the
     * Comparable and Comparator versions are separated into different
     * methods that are otherwise identical. (Similarly for siftDown.)
     *
     * @param k the position to fill
     * @param x the item to insert
     */
    private void siftUp(int k, E x) {
        if (comparator != null)
            siftUpUsingComparator(k, x); //如果设置了comparator 就用设置的比较方法
        else
            siftUpComparable(k, x); //如果没有设置就x定义的comparator 方法
    }

    @SuppressWarnings("unchecked")
    private void siftUpComparable(int k, E x) {
        Comparable<? super E> key = (Comparable<? super E>) x;
        while (k > 0) {
            int parent = (k - 1) >>> 1; //为啥这么一个技术公式就能计算出来父节点
            Object e = queue[parent];
            if (key.compareTo((E) e) >= 0)
                break;
            queue[k] = e;
            k = parent;
        }
        queue[k] = key;
    }

    @SuppressWarnings("unchecked")
    private void siftUpUsingComparator(int k, E x) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = queue[parent];
            if (comparator.compare(x, (E) e) >= 0)
                break;
            queue[k] = e;
            k = parent;
        }
        queue[k] = x;
    }

代码很简单,就是先要抢锁(抢不到会怎么办?需要你去看ReentrantLock源码哦)执行,然后调用PriorityQueue的offer方法去调整堆结构了。
好了,我们分析下这个调整方法siftUp,非常有意思,
1.1 最小堆首先是一个二叉树,父节点的值要小于他的左子节点和右子节点,用数组表示,假设节点在数组的下标是n,那它的左右子节点再数组的位置分别为2n+1和2n+2。
1.2 每次增加元素时,都是放到数组的最后一个位置,那这个位置肯定是某个节点左子节点或者右子节点。
1.3 现在我们已知子节点的位置了,我们要找出来父节点的位置,完了比较下大小,调整堆结构,把每个值都放到他合理的位置上去。
完了咱们看代码,咦,这代码怎么看也不像是把元素放到最后一个位置,完了交换啊。反正我比较笨,看了好久才看懂,尤其是这句: int parent = (k - 1) >>> 1; 大家知道>>>相当于无符号右移,相当于除2,k传入的是数组当前的长度size,就是要插入元素的位置,可是这个位置既有可能是左子节点,又有可能是右子节点,怎么可能这一个计算公式就算出来他的父节点呢?其实这个跟int型的除法机制有关系,我们知道,子节点的位置要不是在偶数节点上,要不再奇数几点上,举个例子:假设节点位置是3,那他的左子节点就是2*3+1=7,右子节点是2*3+2=8,现在开始根据代码里面的算法反推,(7-1)/2=3,(8-1)/2=3.5,注意我们是int型的,那后面这个得到的结果也应该是3.也就说通过这一个公式就能获得父节点了,哈哈哈。
剩下的代码就简单了,就是比较当前父节点和子节点的大小,把要插入的元素调整到合适的位置上去。

2.2 take方法

取出的看take方法吧,其他的poll、poll(long timeout, TimeUnit unit)实现逻辑上都差不多,贴源码:

 /**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element with an expired delay is available on this queue.
     *
     * @return the head of this queue
     * @throws InterruptedException {@inheritDoc}
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly(); //可中断的锁
        try {
            for (;;) {
                E first = q.peek(); //取root  
                if (first == null)
                    available.await(); //没有当前线程wait
                else {
                    long delay = first.getDelay(NANOSECONDS); //取得时间差值 ---getDelay是插入的对象实现的方法
                    if (delay <= 0)
                        return q.poll(); //符合条件  从优先队列里面取出root节点 返回  代码在下面贴出来了
                    first = null; // don't retain ref while waiting
                    if (leader != null) //当前线程不是leader,wait住,保证只有一个线程在搞事情
                        available.await();
                    else {  //队列中数据既不为空  root节点还不符合条件 leader ==null
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread; //设置当前线程为leader
                        try {
                            available.awaitNanos(delay); //当前线程休眠指定的时间
                        } finally { //return前执行的方法
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal(); //当前线程处理结束 并且队列的数据不空 唤醒condition上等待的一个线程
            lock.unlock();
        }
    }

--------下面是PriorityQueue的代码---------------
    @SuppressWarnings("unchecked")
    public E poll() {
        if (size == 0)
            return null;
        int s = --size;
        modCount++;
        E result = (E) queue[0]; //取root节点
        E x = (E) queue[s]; //取最后一个子节点
        queue[s] = null;
        if (s != 0)
            siftDown(0, x); //重新排序
        return result;
    }

    /**
     * Inserts item x at position k, maintaining heap invariant by
     * demoting x down the tree repeatedly until it is less than or
     * equal to its children or is a leaf.
     *
     * @param k the position to fill
     * @param x the item to insert
     */
    private void siftDown(int k, E x) {
        if (comparator != null)
            siftDownUsingComparator(k, x); //如果设置了comparator 就用设置的比较方法
        else
            siftDownComparable(k, x); //如果没有设置就x定义的comparator 方法
    }

    @SuppressWarnings("unchecked")
    private void siftDownComparable(int k, E x) {
        Comparable<? super E> key = (Comparable<? super E>)x;
        int half = size >>> 1;  //计算非叶子节点数      // loop while a non-leaf
        while (k < half) { //k=0 开始的循环
            int child = (k << 1) + 1; // assume left child is least 找到他的左子树
            Object c = queue[child];
            int right = child + 1; //找到右子树
            if (right < size &&
                ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0) //左子树比右子树大
                c = queue[child = right]; //定位到右子树
            if (key.compareTo((E) c) <= 0) //要找到位置的值跟左右子树中小的那个比较,如果小 就退出
                break;
            queue[k] = c;//把小的那个节点向上层移动
            k = child;//移动的位置空了哈,作为下一次循环的起点
        }
        queue[k] = key;
    }

    @SuppressWarnings("unchecked")
    private void siftDownUsingComparator(int k, E x) {
        int half = size >>> 1;
        while (k < half) {
            int child = (k << 1) + 1;
            Object c = queue[child];
            int right = child + 1;
            if (right < size &&
                comparator.compare((E) c, (E) queue[right]) > 0)
                c = queue[child = right];
            if (comparator.compare(x, (E) c) <= 0)
                break;
            queue[k] = c;
            k = child;
        }
        queue[k] = x;
    }

我把源码统一注释了一遍,总体来说,就是你要先抢到锁,才能有机会去取最小堆的root,没有满足条件数据的话,就要看你是不是leader了,如果是,你就休眠有限的时间,再去被唤醒取得资源,如果你不是leader,不好意思,你只能进入condition队列,等待被唤醒了。
那什么情况下,非leader线程会抢到锁呢?这个还是需要你先理解 await()方法,在condition的await方法里面,是先要释放锁,完了才通过LockSupport.park方法去wait住当前线程。既然释放了锁,那其他线程都有机会抢锁了。也就是说,正是由于await的机制,我们会造成线程的争用,所以作者搞了个Leader-Follower机制来避免过多的线程频繁抢锁争用资源。
对于PriorityQueue的siftDown,注释里面也注释了一遍,我在看的过程中恍惚记起了大学是老师讲二叉树的时候的各种简便的计算方式,比如size/2就是非叶子节点数。。。可以看我引用的那篇文章,自己画一下二叉树和数组,对着代码自己琢磨琢磨相信你会有收获的。

1 总结一下

1.1 用了ReentrantLock和Condition,所以你要先了解这两个东西,代码才好分析。并发包的的东西都是一环套一环的啊,下面有个图,顺着这个图撸撸代码,会有不小的收获。


image.png

1.2 用了PriorityQueue,优先队列,二叉小顶堆,堆顶是最小的数据,通过这个队列,我们对入队的元素进行排序,root元素出队后重排序。
1.3 入队的对象必须是实现Delay接口,也就是说必须实现long getDelay(TimeUnit unit);int compareTo(T o);两个方法,即延时方法和比较方法需要子类实现。

一句话总结:延时队列就是基于优先队列、ReentrantLock、Condition实现延时效果的队列。==等于没说

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,509评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,806评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,875评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,441评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,488评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,365评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,190评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,062评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,500评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,706评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,834评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,559评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,167评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,779评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,912评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,958评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,779评论 2 354

推荐阅读更多精彩内容