源码篇-ScheduledThreadPoolExecutor之DelayedWorkQueue

一、添加元素

public void put(Runnable e) {
    offer(e);
}

public boolean add(Runnable e) {
    return offer(e);
}

public boolean offer(Runnable e, long timeout, TimeUnit unit) {
    return offer(e);
}
  • put方法和add方法都会调用offer方法,put方法没有返回值,add返回是否添加成功
  • 因为DelayedWorkQueue可以扩容,添加元素没有阻塞,所以带时间的offer方法最终调用的还是不带时间的offer方法
1.offer方法
public boolean offer(Runnable x) {
    if (x == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = size;
        // 如果超过了数组的容量,执行扩容50%
        if (i >= queue.length)
            grow();
        // 数组元素个数加1
        size = i + 1;
        // 如果是第一个元素,直接设置值
        if (i == 0) {
            queue[0] = e;
            setIndex(e, 0);
        // 如果不是第一个元素,需要向上比较并移动
        } else {
            siftUp(i, e);
        }
        // 如果是第一个元素,说明之前是空的,将leader置为空,通知等待获取队列数据的线程
        if (queue[0] == e) {
            leader = null;
            available.signal();
        }
    } finally {
        lock.unlock();
    }
    return true;
}
  • 首先看是否需要扩容,每次扩容1.5倍
  • 由于queue是小堆数据结构,如果是第一个元素,直接添加到数组中;如果不是第一个元素,需要与父结点比较并移动
  • 如果是第一个元素,需通知从队列获取数据的线程
private void siftUp(int k, RunnableScheduledFuture<?> key) {
    while (k > 0) {
        // 获取父结点索引位置
        int parent = (k - 1) >>> 1;
        // 获取父结点元素
        RunnableScheduledFuture<?> e = queue[parent];
        // 如果当前插入的元素延迟时间或者序列号大于父结点,直接退出while循环
        if (key.compareTo(e) >= 0)
            break;
        // 走到这,说明需要将新元素上移,将父元素替换到新元素的位置
        queue[k] = e;
        setIndex(e, k);
        // 将k的值更改为父元素的值,进行下一次循环
        k = parent;
    }
    // 重新插入新的元素
    queue[k] = key;
    // 设置元素在数组的位置
    setIndex(key, k);
}
  • DelayedWorkQueue是小堆树的数据结构,如果以0为下标开始编号,当前结点的在数组中的位置为i,那么当前结点的父结点在数组的(i-1)/2位置,左结点是2i+1的位置,右结点是2i+2的位置;
  • key是类ScheduledFutureTask,重写了compareTo方法,比较的是time,sequenceNumber,NANOSECONDS大小

二、获取元素

1. poll()
public RunnableScheduledFuture<?> poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 获取小堆树的根结点
        RunnableScheduledFuture<?> first = queue[0];
        // 如果根结点为空或者时间还没到,则返回null
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        // 返回根结点并重新平衡小堆树
        else
            return finishPoll(first);
    } finally {
        lock.unlock();
    }
}

private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
    // 元素个数减1
    int s = --size;
    // 获取最后一个元素
    RunnableScheduledFuture<?> x = queue[s];
    // 将最后一个元素置为空
    queue[s] = null;
    // 如果最后一个元素不是唯一的元素,那么放到根结点,并再平衡小堆树
    if (s != 0)
        siftDown(0, x);
    setIndex(f, -1);
    return f;
}

private void siftDown(int k, RunnableScheduledFuture<?> key) {
    int half = size >>> 1;
    // while循环,k<half说明还没到叶子结点,所以一直要比较
    while (k < half) {
        // 左结点索引
        int child = (k << 1) + 1;
        // 获取左结点元素
        RunnableScheduledFuture<?> c = queue[child];
        // 右结点索引
        int right = child + 1;
        // 因为结点向下移动,所以要与子节点的较小值比较,所以这里比较左结点元素与右结点元素大小
        // 如果右结点较小,将c变更为右结点
        if (right < size && c.compareTo(queue[right]) > 0)
            c = queue[child = right];
        // 如果当前结点比子节点的较小值小,直接退出循环
        if (key.compareTo(c) <= 0)
            break;
        // 与较小的子结点调换位置,再循环
        queue[k] = c;
        setIndex(c, k);
        k = child;
    }
    queue[k] = key;
    setIndex(key, k);
}
  • 如果队列为空,或者时间还没到,则返回空
  • 返回的元素是第一个元素,将末尾元素放到一个位置,并向下再平衡
2. poll(long timeout, TimeUnit unit)
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            // 队列为空
            if (first == null) {
                // 阻塞时间小于等于0,返回空
                if (nanos <= 0)
                    return null;
                // 有等待时间,那么就等待一段时间
                else
                    nanos = available.awaitNanos(nanos);
            // 队列不为空
            } else {                
                long delay = first.getDelay(NANOSECONDS);
                // 时间到了,返回第一个元素
                if (delay <= 0)
                    return finishPoll(first);
                // 时间没到,如果等待时间小于等于0,返回空
                if (nanos <= 0)
                    return null;
                first = null; // don't retain ref while waiting
                // 如果等待时间小于延迟时间
                // 或者
                // leader不为空,说明有leader线程
                // 那么等待一段时间
                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos);
                // 等待时间大于延迟时间且leader为空
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 因为等待时间大于延迟时间,所以这里只能等待延迟时间
                        long timeLeft = available.awaitNanos(delay);
                        nanos -= delay - timeLeft;
                    } finally {
                        // leader线程结束
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 如果没有leader线程且队列不为空,唤醒获取元素的线程
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}
  • 如果队列为空,等待时间小于等于0,直接返回空,如果有等待时间,那么就阻塞一段时间;
  • 如果队列不为空
    • 任务时间到了,返回队列第一个位置的元素
    • 任务时间没到,但是等待的时间小于等于0,那么返回空
    • 如果等待的时间小于任务执行的时间点,或者leader线程不为空,那么阻塞等待的时间
    • 如果等待时间大于任务执行的时间点且leader线程为空,那么阻塞任务执行的时间点
    • 最后如果leader线程不为空且队列不为空,那么通知获取对列元素的线程
3. take()
public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            // 如果队列为空,一直阻塞
            if (first == null)
                available.await();
            // 队列不为空
            else {
                // 获取任务执行的延迟时间
                long delay = first.getDelay(NANOSECONDS);
                // 没有延迟时间,返回队列的元素
                if (delay <= 0)
                    return finishPoll(first);
                first = null; // don't retain ref while waiting
                // 如果leader线程不为空,阻塞当前线程
                if (leader != null)
                    available.await();
                // 如果leader线程为空,那么阻塞延迟的时间
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 如果leader线程为空且队列不为空,通知等待获取队列元素的线程
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}
  • 如果队列为空,一直阻塞,直至被唤醒
  • 如果队列不为空,到了任务的延迟时间,那么从队列里弹出任务;如果没到任务的延迟时间,那么阻塞任务,阻塞时间为任务的延迟时间,但是如果leader线程不为空,那么会一直阻塞,直至被唤醒
4. peek()
public RunnableScheduledFuture<?> peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return queue[0];
    } finally {
        lock.unlock();
    }
}
  • 直接返回第一个元素,不是弹出,不判断是否到了延迟时间
5. peekExpired()
private RunnableScheduledFuture<?> peekExpired() {
    // assert lock.isHeldByCurrentThread();
    RunnableScheduledFuture<?> first = queue[0];
    return (first == null || first.getDelay(NANOSECONDS) > 0) ?
        null : first;
}
  • 只返回到期的数据

三、删除元素

public boolean remove(Object x) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 获取删除元素的索引
        int i = indexOf(x);
        if (i < 0)
            return false;

        // 将删除的元素索引设为-1
        setIndex(queue[i], -1);
        // 元素个数减1
        int s = --size;
        // 用数组尾部元素替换删除的结点
        RunnableScheduledFuture<?> replacement = queue[s];
        queue[s] = null;
        // 如果删除不是尾部元素
        if (s != i) {
            // 先向下平衡
            siftDown(i, replacement);
            // 如果向下平衡失败,也就是下面的都是大于当前结点,那么就想上平衡
            if (queue[i] == replacement)
                siftUp(i, replacement);
        }
        return true;
    } finally {
        lock.unlock();
    }
}
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容