在Java中,BlockingQueue是一个接口,它的实现类有 ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、 LinkedBlockingQueue、PriorityBlockingQueue、 SynchronousQueue等,它们的区别主要体现在存储结构上或对元素 操作上的不同,但是对于take与put操作的原理,却是类似的。
阻塞与非阻塞
-
入队
offer(E e):如果队列没满,立即返回true;如果队列满了,立即返回 false --> 不阻塞
put(E e):如果队列满了,一直阻塞,直到队列不满了或者线程被中 断 --> 阻塞
-
出队
poll():如果没有元素,直接返回null;如果有元素,出队 --> 不阻塞
take():如果队列空了,一直阻塞,直到队列不为空或者线程被中断 --> 阻塞
一、LinkedBlockingQueue
LinkedBlockingQueue可以指定容量,如果在初始化时没有指定容量,那么默认使用int的最大值作为队列容量。内部维持一个队列,所以有一个头节点head和一个尾节点last,内部维持两把锁,一个用于入队,一个用于出队,还有锁关联的Condition对象。
1、底层数据结构
LinkedBlockingQueue内部是使用链表实现一个队列的,但是有别于一般的队列,在于该队列至少是有一个节点的,头节点不含有元素。如果队列为空时,头节点的next参数为null。尾节点的next参数也为null。
2、主要变量
// 容量限制,如果没有指定,则为 Integer.MAX_VALUE
private final int capacity;
// 当前队列中的元素数量
private final AtomicInteger count = new AtomicInteger();
// 队列头节点,始终满足head.item == null
transient Node<E> head;
// 队列的尾节点,始终满足last.next == null
private transient Node<E> last;
// 由 take、poll 等持有的锁
private final ReentrantLock takeLock = new ReentrantLock();
// 当队列为空时,保存执行出队的线程
private final Condition notEmpty = takeLock.newCondition();
// 由 put、offer 等持有的锁
private final ReentrantLock putLock = new ReentrantLock();
// 当队列满时,保存执行入队的线程
private final Condition notFull = putLock.newCondition();
3、put(E e)方法
put(E e)方法用于将一个元素插入到队列的尾部。在队列满时会阻塞,直到队列中有元素被取出。
// 在此队列的尾部插入指定元素,如有必要,等待空间可用。
public void put(E e) throws InterruptedException {
// 不允许元素为null
if (e == null) throw new NullPointerException();
int c = -1;
// 以当前元素新建一个节点
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 获得入队的锁
putLock.lockInterruptibly();
try {
// 如果队列已满,那么将该线程加入到Condition的等待队列中
while (count.get() == capacity) {
notFull.await();
}
// 当队列未满,然后有出队线程取出导致,将节点入队
enqueue(node);
// 得到插入之前队列的元素个数。getAndIncrement返回的是 +1 前的值
c = count.getAndIncrement();
// 如果还可以插入元素,那么释放等待的入队线程
if (c + 1 < capacity)
notFull.signal();
} finally {
// 释放入队的锁
putLock.unlock();
}
// 如果插入队列之前元素个数为0,插入后就通知出队线程队列非空
if (c == 0)
signalNotEmpty();
}
put方法总结:
1、LinkedBlockingQueue不允许插入的元素为null;
2、同一时刻只有一个线程可以进行入队操作,putLock在将元素插入队列尾部时加锁了;
3、如果队列满了,则会调用notFull.await(),将该线程加入到Condition等待队列中。await方法会释放线程占有的锁,这将导致之前由于被阻塞的入队线程将会获取到锁,执行到while循环处,不过可能因为队列仍旧是满的,也被进入到条件队列中;
4、一旦有出队线程取走元素,就会通知到入队等待队列释放线程。那么第一个加入到Condition队列中的将会被释放,那么该线程将会重新获得put锁,继而执行enqueue()方法,将节点插入到队列的尾部;
5、然后得到插入队列前元素的个数,如果插入后队列中还可以继续插入元素,那么就通知notFull条件的等待队列中的线程;
6、如果插入队列前个数为0,那现在插入后,就为1了,那就可以通知因为队列为空而导致阻塞的出队线程去取元素了。
4、E take()方法
take()方法用于得到队头的元素,在队列为空时会阻塞,直到队列中有元素可取。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 获取takeLock锁
takeLock.lockInterruptibly();
try {
// 如果队列中元素数量为0,则将出队线程加入到notEmpty队列中进行等待
while (count.get() == 0) {
notEmpty.await();
}
// 得到到队头的元素
x = dequeue();
// 得到出队列前元素的个数。getAndDecrement返回的是 -1 前的值
c = count.getAndDecrement();
// 如果出队列前的元素数量大于1,那说明还可以继续取,那就释放在notEmpty队列的第一个线程
if (c > 1)
notEmpty.signal();
} finally {
// 释放出队锁
takeLock.unlock();
}
// 如果出队前队列是满的,那现在取走一个元素了,队列就不满了,就可以去通知等待中的入队线程了。
if (c == capacity)
signalNotFull();
return x;
}
take方法总结:
1、同一时刻只有一个线程可以进行出队操作,takeLock在出队之前加锁了;
2、如果队列中元素为空,那就进入notEmpty队列中进行等待。直到队列不为空时,得到队列中的第一个元素。当发现取完发现还有元素可取时,再通知一下notEmpty队列中等待的其他线程。最后判断自己取元素前的是不是满的,如果是满的,那自己取完,就不满了,就可以通知在notFull队列中等待插入的线程进行put了。
5、remove()方法
remove()方法用于删除队列中一个元素,如果队列中不含有该元素,那么返回 false ;有的话则删除并返回true。入队和出队都是只获取一个锁,而 remove()方法需要同时获得两把锁,
public boolean remove(Object o) {
// 因为队列不包含null元素,返回false
if (o == null) return false;
// 获取两把锁
fullyLock();
try {
// 从头的下一个节点开始遍历
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
// 如果匹配,那么将节点从队列中移除,trail表示需要删除节点的前一节点
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
// 释放两把锁
fullyUnlock();
}
}
/**
* 锁定以防止 put 和 take.
*/
void fullyLock() {
putLock.lock();
takeLock.lock();
}
/**
* 解锁以允许 put 和 take.
*/
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
6、总结
LinkedBlockingQueue允许两个线程同时在两端进行入队和出队操作,但一端同时只能有一个线程进行操作,是通过两个锁进行区分的。
为了维护底部数据的统一,引入了AtomicInteger的一个count变量,表示队列中元素的个数。count只能在两个地方变化,一个是入队的方法(进行+1操作),另一个是出队的方法(进行-1操作),而AtomicInteger是原子安全的,所以也就确保了底层队列的数据同步。
二、ArrayBlockingQueue
// 构造方法
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
// 创建具有给定(固定)容量和指定访问策略的ArrayBlockingQueue 。
// 参数:capacity —— 这个队列的容量
fair —— 如果为true ,则在插入或删除时阻塞的线程的队列访问将按 FIFO 顺序处理;如果为false ,则未指定访问顺序。
public ArrayBlockingQueue(int capacity, boolean fair) {
// 如果capacity < 1,抛出异常
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
1、底层数据结构
ArrayBlockingQueue内部是使用数组实现一个队列的,并且在构造方法中就需要指定容量,也就意味着底层数组一旦创建了,容量就不能改变了,因此ArrayBlockingQueue是一个容量限制的阻塞队列。因此在队列满的时候执行入队会阻塞,在队列为空时出队也会阻塞。
2、主要变量
// 元素数组,其长度在构造方法中指定
final Object[] items;
// 队列中实际的元素数量
int count;
// 保护所有通道的主锁
final ReentrantLock lock;
// 等待take条件的队列
private final Condition notEmpty;
// 等待put条件的队列
private final Condition notFull;
3、put(E e)
在此队列的尾部插入指定元素,如果队列已满,则等待空间可用
public void put(E e) throws InterruptedException {
// 检查元素是否为null,如果是,抛出NullPointerException
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
// 加锁
lock.lockInterruptibly();
try {
// 当队里中的元素数量等于数组长度,则队列已满,阻塞,等待队列成为不满状态
while (count == items.length)
notFull.await();
// 将元素入队
enqueue(e);
} finally {
// 释放锁
lock.unlock();
}
}
put方法总结:
1、ArrayBlockingQueue不允许添加null元素;
2、ArrayBlockingQueue在队列已满的时候,会调用notFull.await(),释放锁并处于阻塞状态;
3、一旦ArrayBlockingQueue在队列不满的时候,就立即入队。
4、E take()
取出此队列的头部元素,如果队列空,则阻塞,等待元素可取。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 加锁
lock.lockInterruptibly();
try {
// 当队列中元素数量为0时,则进入阻塞状态
while (count == 0)
notEmpty.await();
// 队列不为空是,调用dequeue()出队
return dequeue();
} finally {
// 释放锁
lock.unlock();
}
}
take方法总结:
1、取元素时,一旦获得锁,队列为空, 则会阻塞,直至不为空,调用dequeue()出队。
5、总结
ArrayBlockingQueue是一个底层结构是数组的阻塞队列,是通过 ReentrantLock 和 Condition 来实现的。不可插入为null的元素,入队和出队使用的是同一个锁。意味着同一时刻只能有一个线程能进行入队或者出队操作。入队时,队列已满则会调用notFull.await(),进入阻塞状态。直到队列不满时,再进行入队操作。当出队时,队列为空,则调用notEmpty.await(),进入阻塞状态,直到队列不为空时,则出队。
三、LinkedBlockingQueue和ArrayBlockingQueue的区别
1、底层实现不同
LinkedBlockingQueue底层实现是链表,ArrayBlockingQueue底层实现是数组
2、队列容量
LinkedBlockingQueue默认的队列长度是Integer.Max,但是可以指定容量。在入队与出队都高并发的情况下,性能比ArrayBlockingQueue高很多;
ArrayBlockingQueue必须在构造方法中指定队列长度,不可变。在只有入队高并发或出队高并发的情况下,因为操作数组,且不需要扩容,性能很高。
3、锁的数量
LinkedBlockingQueue有两把锁,可以有两个线程同时进行入队和出队操作,但同时只能有一个线程进行入队或出队操作。
ArrayBlockingQueue只有一把锁,同时只能有一个线程进行入队和出队操作。