阻塞队列接口
BlockingQueue<E>的方法
既然是队列,说明遵循先进先出的规则(FIFO),肯定有入队和出队的方法,看了一下注释,有几种不同的出队入队方法,下面列举了一下:
| | 抛异常 | 返回值(null/true/false) | 阻塞 | 指定超时时间 |
|:------:|:-------- :|:---------:|:--------:|:-----------------------:|
| 插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
| 删除 | remove() | poll() | take() | poll(time,unit) |
| 检查 | element() | peek() | | |
BlockingQueue的常见实现类
- ArrayBlockingQueue<E>
底层使用一个Object数组保存元素,自然能想到在初始化的时候必须指定最大容量。
//fair表示是否需要公平性,默认关闭,建议使用默认值
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
// 初始化一个数组
this.items = new Object[capacity];
//创建ReentrantLock实例
lock = new ReentrantLock(fair);
//创建一个非空的条件对象
notEmpty = lock.newCondition();
//创建一个非满的条件对象
notFull = lock.newCondition();
}
//入列
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;//putIndex表示当前入队的元素索引
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();//通知因notEmpty条件不满足阻塞的线程解除阻塞
}
//出列
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];//takeIndex表示当前出列索引
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();//通知因notFull条件不满足阻塞的线程解除阻塞
return x;
}
//添加元素
public void put(E e) throws InterruptedException {
checkNotNull(e);//检查元素非空
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//获取锁
try {
while (count == items.length) //队列满了
notFull.await();//当前线程进入条件等待集,直到其他线程调用notFull.signal方法解除阻塞
enqueue(e); //入列
} finally {
lock.unlock(); //解锁
}
}
//删除元素
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) //队列为空
notEmpty.await(); //当前线程进入条件等待集,直到其他线程调用notEmpty.signal方法解除阻塞
return dequeue(); //出列
} finally {
lock.unlock();
}
}
因为ArrayBlockingQueue是基于数组实现,初始化时会指定数组大小,传入参数不能是null,否则报NullPointerException。
通过分析源码,ArrayBlockingQueue的实现核心是使用ReentrantLock(可重入互斥锁),为什么叫可重入呢?是因为线程是可以重复获得已经持有的锁。锁持有一个计数(hold count)来跟踪lock方法的嵌套调用。线程每调用一次lock方法后都要调用unlock来释放锁
指定超时时间的两个方法offer和poll的实现原理是分别调用对应条件的awaitNanos(nanos),这个方法是指定等待时间,超过这个时间当前线程会自动唤醒,这这段时间内,其他线程调用了同种条件的signal或signalAll也会唤醒当前线程
LinkedBlockingQueue<E>
- 基于单向链表(item,next)实现,默认容量最大为Integer.MAX_VALUE,也可以初始化是指定容量大小。
- 与此类似的还有一个LinkedBlockingDeque,它里面维护的是一个双端链表(item,prev,next),它可实现先进后出(FILO)