一、添加元素
public void put(Runnable e) {
offer(e);
}
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable e, long timeout, TimeUnit unit) {
return offer(e);
}
- put方法和add方法都会调用offer方法,put方法没有返回值,add返回是否添加成功
- 因为DelayedWorkQueue可以扩容,添加元素没有阻塞,所以带时间的offer方法最终调用的还是不带时间的offer方法
1.offer方法
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
// 如果超过了数组的容量,执行扩容50%
if (i >= queue.length)
grow();
// 数组元素个数加1
size = i + 1;
// 如果是第一个元素,直接设置值
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
// 如果不是第一个元素,需要向上比较并移动
} else {
siftUp(i, e);
}
// 如果是第一个元素,说明之前是空的,将leader置为空,通知等待获取队列数据的线程
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
- 首先看是否需要扩容,每次扩容1.5倍
- 由于queue是小堆数据结构,如果是第一个元素,直接添加到数组中;如果不是第一个元素,需要与父结点比较并移动
- 如果是第一个元素,需通知从队列获取数据的线程
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
// 获取父结点索引位置
int parent = (k - 1) >>> 1;
// 获取父结点元素
RunnableScheduledFuture<?> e = queue[parent];
// 如果当前插入的元素延迟时间或者序列号大于父结点,直接退出while循环
if (key.compareTo(e) >= 0)
break;
// 走到这,说明需要将新元素上移,将父元素替换到新元素的位置
queue[k] = e;
setIndex(e, k);
// 将k的值更改为父元素的值,进行下一次循环
k = parent;
}
// 重新插入新的元素
queue[k] = key;
// 设置元素在数组的位置
setIndex(key, k);
}
- DelayedWorkQueue是小堆树的数据结构,如果以0为下标开始编号,当前结点的在数组中的位置为i,那么当前结点的父结点在数组的(i-1)/2位置,左结点是2i+1的位置,右结点是2i+2的位置;
- key是类ScheduledFutureTask,重写了compareTo方法,比较的是time,sequenceNumber,NANOSECONDS大小
二、获取元素
1. poll()
public RunnableScheduledFuture<?> poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取小堆树的根结点
RunnableScheduledFuture<?> first = queue[0];
// 如果根结点为空或者时间还没到,则返回null
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
// 返回根结点并重新平衡小堆树
else
return finishPoll(first);
} finally {
lock.unlock();
}
}
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
// 元素个数减1
int s = --size;
// 获取最后一个元素
RunnableScheduledFuture<?> x = queue[s];
// 将最后一个元素置为空
queue[s] = null;
// 如果最后一个元素不是唯一的元素,那么放到根结点,并再平衡小堆树
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}
private void siftDown(int k, RunnableScheduledFuture<?> key) {
int half = size >>> 1;
// while循环,k<half说明还没到叶子结点,所以一直要比较
while (k < half) {
// 左结点索引
int child = (k << 1) + 1;
// 获取左结点元素
RunnableScheduledFuture<?> c = queue[child];
// 右结点索引
int right = child + 1;
// 因为结点向下移动,所以要与子节点的较小值比较,所以这里比较左结点元素与右结点元素大小
// 如果右结点较小,将c变更为右结点
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
// 如果当前结点比子节点的较小值小,直接退出循环
if (key.compareTo(c) <= 0)
break;
// 与较小的子结点调换位置,再循环
queue[k] = c;
setIndex(c, k);
k = child;
}
queue[k] = key;
setIndex(key, k);
}
- 如果队列为空,或者时间还没到,则返回空
- 返回的元素是第一个元素,将末尾元素放到一个位置,并向下再平衡
2. poll(long timeout, TimeUnit unit)
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
// 队列为空
if (first == null) {
// 阻塞时间小于等于0,返回空
if (nanos <= 0)
return null;
// 有等待时间,那么就等待一段时间
else
nanos = available.awaitNanos(nanos);
// 队列不为空
} else {
long delay = first.getDelay(NANOSECONDS);
// 时间到了,返回第一个元素
if (delay <= 0)
return finishPoll(first);
// 时间没到,如果等待时间小于等于0,返回空
if (nanos <= 0)
return null;
first = null; // don't retain ref while waiting
// 如果等待时间小于延迟时间
// 或者
// leader不为空,说明有leader线程
// 那么等待一段时间
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
// 等待时间大于延迟时间且leader为空
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 因为等待时间大于延迟时间,所以这里只能等待延迟时间
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
// leader线程结束
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 如果没有leader线程且队列不为空,唤醒获取元素的线程
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
- 如果队列为空,等待时间小于等于0,直接返回空,如果有等待时间,那么就阻塞一段时间;
- 如果队列不为空
- 任务时间到了,返回队列第一个位置的元素
- 任务时间没到,但是等待的时间小于等于0,那么返回空
- 如果等待的时间小于任务执行的时间点,或者leader线程不为空,那么阻塞等待的时间
- 如果等待时间大于任务执行的时间点且leader线程为空,那么阻塞任务执行的时间点
- 最后如果leader线程不为空且队列不为空,那么通知获取对列元素的线程
3. take()
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
// 如果队列为空,一直阻塞
if (first == null)
available.await();
// 队列不为空
else {
// 获取任务执行的延迟时间
long delay = first.getDelay(NANOSECONDS);
// 没有延迟时间,返回队列的元素
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
// 如果leader线程不为空,阻塞当前线程
if (leader != null)
available.await();
// 如果leader线程为空,那么阻塞延迟的时间
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 如果leader线程为空且队列不为空,通知等待获取队列元素的线程
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
- 如果队列为空,一直阻塞,直至被唤醒
- 如果队列不为空,到了任务的延迟时间,那么从队列里弹出任务;如果没到任务的延迟时间,那么阻塞任务,阻塞时间为任务的延迟时间,但是如果leader线程不为空,那么会一直阻塞,直至被唤醒
4. peek()
public RunnableScheduledFuture<?> peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return queue[0];
} finally {
lock.unlock();
}
}
- 直接返回第一个元素,不是弹出,不判断是否到了延迟时间
5. peekExpired()
private RunnableScheduledFuture<?> peekExpired() {
// assert lock.isHeldByCurrentThread();
RunnableScheduledFuture<?> first = queue[0];
return (first == null || first.getDelay(NANOSECONDS) > 0) ?
null : first;
}
三、删除元素
public boolean remove(Object x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取删除元素的索引
int i = indexOf(x);
if (i < 0)
return false;
// 将删除的元素索引设为-1
setIndex(queue[i], -1);
// 元素个数减1
int s = --size;
// 用数组尾部元素替换删除的结点
RunnableScheduledFuture<?> replacement = queue[s];
queue[s] = null;
// 如果删除不是尾部元素
if (s != i) {
// 先向下平衡
siftDown(i, replacement);
// 如果向下平衡失败,也就是下面的都是大于当前结点,那么就想上平衡
if (queue[i] == replacement)
siftUp(i, replacement);
}
return true;
} finally {
lock.unlock();
}
}