DelayQueue阻塞队列系列文章
DelayQueue阻塞队列第一章:代码示例
DelayQueue阻塞队列第二章:源码解析
介绍
DelayQueue是java并发包中提供的延迟阻塞队列,业务场景一般是下单后多长时间过期,定时执行程序等
1-DelayQueue的组成结构
/**
* DelayQueue队列继承了AbstractQueue,并且实现BlockingQueue的方法
*/
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
//使用ReentrantLock进行线程的同步
private final transient ReentrantLock lock = new ReentrantLock();
//使用优先级队列PriorityQueue作为存放数据的队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
//使用leader/follower模式来避免多线程性能的消耗
private Thread leader = null;
//使用Condition等待队列来保存请求的线程(l/f模式)
private final Condition available = lock.newCondition();
DelayQueue中的元素需要实现Delayed接口,重写getDelay()和compareTo()方法,其中getDelay()方法是为了获取队列元素延迟剩余时间,compareTo()方法是为了对队列中的元素进行一个排序,使符合条件的元素排在队列的最前面
DelayQueue内部的实现基本就是依靠重写BlockingQueue方法,使用ReentrantLock进行同步操作,使用PriorityQueue存放队列元素,Condition存放访问线程
DelayQueue内部采用了leader/follower设计模式,旨在减小多线程的消耗,本文不详细介绍
2源码实现细节
offer方法:将元素加入到延迟队列中去
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//元素加入优先级队列
q.offer(e);
//如果新加入的元素e就是队列的头元素,将leader置为null切唤醒等待线程
if (q.peek() == e) { //Q1此处为何要获取队列头节点元素并与新加入元素进行比较
leader = null; //Q2为何要将leader线程置为null且唤起等待队列
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
offer方法比较简单,只针对以上两处做详细说明
Q1和Q2两个操作都是为了解决一个问题,就是leader对应队列首节点元素的问题,因为元素是不断在加入的,比如,leader对应需要取出的首节点是A,此时A虽然是首节点元素,但是还没有到达延迟时间,所以leader还在等待A,他们的关系是对应的(对应关系的逻辑参考take()源码),那么此时加入了元素B,这时候元素B排在了队首,那么此时需要处理元素B的就不再是当前的leader了,所以我们需要将leader置空,重新选取新的leader来处理这个B,至于之前的leader线程,在take源码中,在调用available.awaitNanos(delay)后,当时间到了会重新获取锁然后执行操作
所以我们要首先判断加入的新元素是否是首节点,以便确定对应线程的处理关系
绝大多数的文章对源码中为什么进行if (q.peek() == e)和leader = null的操作的原因只字不提,我觉得还是有必要写下的,我对于此处原因的理解可能也存在偏差,希望各位不吝赐教
take方法:取出元素并处理元素事件
/**
* 首先获取优先队列的首个元素,如果为空则调用线程沉睡。
* 如果优先级队列不为空,查看当前首元素是否到达过期时间,到达过期时间了就获取并移除队列
* 如果没有到达过期时间,将first变量置为null(防止内存泄漏),如果leader线程不为空则进入等待队列
* 如果leader为空,则当前线程为leader,并限时进入等待队列中进行等待
* 如果leader为空,队列中还有元素存在,则唤醒所有等待的follower线程
* 继续循环,直到获取延时队列中的元素
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek(); //获取优先队列中的首个节点
if (first == null) //如果优先队列中没有节点,则该线程进入等待线程
available.await();
else { //如果首节点不为空
long delay = first.getDelay(NANOSECONDS); //获取当前元素还需要延时多长时间
if (delay <= 0) //如果延时时间小于或是等于0,则移出队列
return q.poll();
first = null; // don't retain ref while waiting防止内存泄漏
if (leader != null) //说明leader线程正在工作,当前线程就进入等待队列中
available.await();//当前线程转变为follower线程
else { //如果首节点不为空,延时时间还没到,没有相应的处理线程
Thread thisThread = Thread.currentThread(); //获取当前线程
leader = thisThread; //当前线程设置为首线程
try {
available.awaitNanos(delay); //限时进入等待队列中处理延时时间最小的元素,并释放锁
} finally {
if (leader == thisThread)
leader = null; //执行事件之后,将leader线程置为null让给其他线程
}
}
}
}
} finally {
if (leader == null && q.peek() != null) //如果leader线程为null,优先级队列中还有元素,则唤醒通知队列中的线程
available.signal();
lock.unlock();
}
}
take方法是DelayQueue的核心方法,获取延迟队列中的元素,检索并移除这个队列的头部,等待直到这个队列的过期元素可用
关于源码的疑惑,不将first=null为什么会导致内存泄露?
核心点在于leader调用await方法时会释放锁,比如,当线程A获取了first,然后将当前线程设为leader线程,接着进入await方法,释放锁,这时线程B也获取了first,因为leader != null,所以进入阻塞队列,这时线程A从等待队列中返回,获取对象释放first,但由于线程B中依然有first的引用,所以gc无法对first进行回收,导致内存的泄露
在DelayQueue还有很多值得研究的源码和问题,我在日后也会慢慢的加上来,第一次写先写这么多吧,不足之处希望可以共同讨论进步!
- 如果我的文章确实有帮助到你,请不要忘了点一下文末的喜欢
- 技术理解不到或有错误请直(bu)接(yao)指(ma)出(wo)
- 写作不易!