简介
DelayQueue 是JDK中提供的延时队列,内部封装优先级队列,并且提供空阻塞功能。DelayQueue中所有元素必须实现Delayed接口getDelay方法,此方法返回剩余有效时间。
DelayQueue 类
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E>
DelayQueue 继承AbstractQueue抽象类,实现BlockingQueue接口,元素必须实现实现Delayed接口。
Delayed 接口
public interface Delayed extends Comparable<Delayed>
Delayed 接口继承Comparable接口,所有子类都具有比较功能
Delayed 方法
// 返回剩余到期时间
long getDelay(TimeUnit unit);
DelayQueue 属性
// 锁对象
private final transient ReentrantLock lock = new ReentrantLock();
// 优先级队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 头线程
private Thread leader = null;
// 条件
private final Condition available = lock.newCondition();
通过属性就能看出,他是通过优先级队列实现,快到期的排前面,每次取优先级队列头,看先是否到期。顺便说一下,优先级队列我们是当无界队列的,所以延时队列也可以称为无界队列。
DelayQueue 构造函数
// 空参构造函数
public DelayQueue() {}
// 使用线性集合初始化
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
DelayQueue 添加
// 添加,无界队列不考虑满
public boolean add(E e) {
// 调用offer
return offer(e);
}
// 添加元素,无界队列不考虑满
public void put(E e) {
// 调用offer
offer(e);
}
// 阻塞添加,无界队列忽略阻塞
public boolean offer(E e, long timeout, TimeUnit unit) {
// 调用offer
return offer(e);
}
实际添加方法offer
public boolean offer(E e) {
// 获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 元素加入优先级队列
q.offer(e);
// 获取优先级头元素,头元素等于当前元素
// 清空leader,并放开读限制
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
// 释放锁
lock.unlock();
}
}
DelayQueue 出队
出队,为空返回null
public E poll() {
// 获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取优先级队列头节点
E first = q.peek();
// 判断头节点是否为空,不为空判断剩余过期时间
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
// 剩余过期时间小于0,优先级队列出队
return q.poll();
} finally {
// 解锁
lock.unlock();
}
}
出队,为空阻塞
public E take() throws InterruptedException {
// 获取锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 自旋
for (;;) {
// 获取优先级队列头节点
E first = q.peek();
// 优先级队列为空
if (first == null)
// 阻塞
available.await();
else {
// 判断头元素剩余时间是否小于等于0
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
// 优先级队列出队
return q.poll();
// 到这,说明剩余时间大于0
// 头引用置空
first = null;
// leader线程是否为空,不为空就等待
if (leader != null)
available.await();
else {
// 设置leader线程为当前线程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 休眠剩余秒
available.awaitNanos(delay);
} finally {
// 休眠结束,leader线程还是当前线程
// 置空leader
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// leader线程为空,并且first不为空
// 唤醒阻塞的leader,让它再去试一次
if (leader == null && q.peek() != null)
available.signal();
// 解锁
lock.unlock();
}
}
出队,空时阻塞,超时退出
public E poll(long timeout, TimeUnit unit)
throws InterruptedException {
// 获取剩余时间
long nanos = unit.toNanos(timeout);
// 获取锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 自旋
for (;;) {
// 获取优先级队列头
E first = q.peek();
// 头为空
if (first == null) {
// 剩余时间小于等于0返回空
// 否则继续等待
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
// 获取头剩余过期时间
long delay = first.getDelay(NANOSECONDS);
// 剩余时间小于等于0,优先级队列出队
if (delay <= 0)
return q.poll();
// 等待时间小于等于0,返回null
if (nanos <= 0)
return null;
// 到这,说明头元素过期时间大于0
// 头引用置空
first = null;
// leader不为空,等待时间短的
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
// 设置leader为当前线程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待剩余时间
long timeLeft = available.awaitNanos(delay);
// 计算剩余超时间(防止恶意唤醒)
// (delay - timeLeft)计算等待了多长时间
nanos -= delay - timeLeft;
} finally {
// 休眠结束,leader线程还是当前线程
// 置空leader
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// leader线程为空,并且first不为空
// 唤醒阻塞的leader,让它再去试一次
if (leader == null && q.peek() != null)
available.signal();
// 解锁
lock.unlock();
}
}
DelayQueue 查询
public E peek() {
// 获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 查询优先级队列头元素
return q.peek();
} finally {
// 解锁
lock.unlock();
}
}
DelayQueue 获取长度
public int size() {
// 获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取优先级队列中元素个数
return q.size();
} finally {
// 解锁
lock.unlock();
}
}
DelayQueue 清空
public void clear() {
// 获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 清空优先级队列
q.clear();
} finally {
// 解锁
lock.unlock();
}
}