ArrayBlockingQueue与LinkedBlockingQueue的源码对比
1. ArrayBlockingQueue
1.1 类内成员与方法
ArrayBlockingQueue
是基于数组的实现。内部维护了一个数组,只有一个内部锁,还有两个 Condition
。
/**
* 队列元素数组
*/
final Object[] items;
/**
* 下一个获取的位置
*/
int takeIndex;
/**
* 下一个加入队列的位置
*/
int putIndex;
/**
* 当前数量
*/
int count;
/**
* 锁
*/
final ReentrantLock lock;
/**
* 用于非空唤醒
*/
private final Condition notEmpty;
/**
* 用于非满队列唤醒
*/
private final Condition notFull;
offer
方法。该方法是入队时如果队列已满,不会进行阻塞。
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
enqueue
方法
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// 唤醒notEmpty.await()的线程
notEmpty.signal();
}
put
方法,该方法会在入队时,如果队列已满,则notFull.await();
阻塞,等待出队时的唤醒。这里如果换成自旋的形式,会造成cpu空转,降低性能。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
poll
方法,该方法在出队时,如果队列中没有数据,则直接返回null,不会阻塞。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
take
方法该方法在出队时,如果队列中没有数据,会阻塞,等待入队时的唤醒notEmpty.signAll
唤醒
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
dequeue
方法
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
根据上述的方法,会产生一个疑问,put
和take
方法两边都有加锁操作,假设put
方法成功lock,但是队列已满,则进入阻塞,那么take
方法在put
唤醒前都无法获得锁,而put
阻塞又是等待take
的唤醒,产生死锁了。但是事实并非如此,接下来走进Condition
的await
和signal
方法。
1.2 await
和signal
方法
先看看await
方法,该方法先将当前线程加入到等待队列中,再将资源释放,通过isOnSyncQueue
检测节点是否在aqs的等待队列中,如果进入到循环中,则进行线程阻塞,等待被唤醒。当线程被唤醒后再通过isOnSyncQueue
检测节点,跳出循环后,会尝试获取资源如果获取资源不成功,将又会回到aqs中的acquireQueued
方法,加入到等待队列,等待唤醒。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程加入到等待队列中
// 这里注意,这个队列与aqs的等待队列不同,这个是由nextWaiter建立,是个单向的链表
Node node = addConditionWaiter();
// 释放资源,将资源变为0,也就是state设置为0,让其他线程能继续获得锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 检测当前节点是否在同步队列中,这里指的SyncQueue是指aqs的双向队列
while (!isOnSyncQueue(node)) {
// 阻塞
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 尝试获取资源
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
再看看signal
方法,该方法主要是从头节点开始,逐个遍历,将满足条件的唤醒。
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 唤醒首个节点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
// 找出第一个满足唤醒条件的节点,不满足的直接移除。
do {
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
1.3 总结
ArrayBlockingQueue
是基于数组实现的有界队列,内部只维护了一把锁,所以出队入队都需要锁住整个队列,影响吞吐。锁使用的是Reentrantlock,并维护了两个非空,非满的Condition,使用Condition更加自由的控制唤醒。如出队时,锁的流程就是队列中不存在数据,则将当前线程节点加入到waiter单向队列中,当队列新增了数据时,会signal唤醒首个waiter节点,节点唤醒后竞争ReentrantLock,如果失败了则会加入到aqs的双向等待队列中,等待再次唤醒竞争。这里要注意的是,执行remove操作时,会将整个数组进行移动(最坏情况下),同时还会获得锁,对性能的影响比较大。
2. LinkedBlockingQueue
2.1 类内成员与方法
LinkedBlockingQueue
是基于链表的实现,维护着一个Node
节点,与ArrayBlockingQueue
不同的是,LinkedBlockingQueue
内部维护了两个ReentrantLock
,把出队和入队分开加锁。同样也维护了两个Condition
。
/**
* 容量,如果不设置容量,默认大小是Integer.MAX_VALUE
*/
private final int capacity;
/**
* 当前队列内节点数,这里不像ArrayBlockingQueue直接使用基础类型
* 因为分开了两把锁,所以需要用原子类型进行增加减少。
*/
private final AtomicInteger count = new AtomicInteger();
/**
* 头结点
*/
transient Node<E> head;
/**
* 尾结点
*/
private transient Node<E> last;
/**
* 出队锁
*/
private final ReentrantLock takeLock = new ReentrantLock();
/**
* 非空条件
*/
private final Condition notEmpty = takeLock.newCondition();
/**
* 入队锁
*/
private final ReentrantLock putLock = new ReentrantLock();
/**
* 非满条件
*/
private final Condition notFull = putLock.newCondition();
offer
方法,该方法如果当前数量已达到设定的容量,直接返回,并不会发生阻塞。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
// 如果当前数量已达到设定的容量,则直接返回false
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 如果当前数量未达到容量大小
if (count.get() < capacity) {
// 入队
enqueue(node);
// 数量+1
c = count.getAndIncrement();
// 如果当前数量小于容量,则唤醒入队线程
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
// 唤醒出队操作
signalNotEmpty();
return c >= 0;
}
put
方法,该方法与offer
方法基本一致,唯一不同就是在于队列已满,阻塞等待。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 队列已满,阻塞等待
while (count.get() == capacity) {
notFull.await();
}
// 入队
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
enqueue
方法
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
poll
方法,该方法在队列中没有元素时直接返回,不会阻塞。
public E poll() {
final AtomicInteger count = this.count;
// 没有元素直接返回空
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 这里再次校验当前元素个数
if (count.get() > 0) {
// 出队
x = dequeue();
// 当前数量减一
c = count.getAndDecrement();
// 如果当前数量大于1,唤醒出队线程
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
// c是出队前的数量,如果等于容量,则队列未满
if (c == capacity)
signalNotFull();
return x;
}
take
方法,该方法与poll
基本一致,不同在于 队列为空时,阻塞等待。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 队列为空时,阻塞等待
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
dequeue
方法
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
2.2 总结
LinkedBlockingQueue
是基于链表的实现,是单向链表,与ArrayBlockingQueue
不同的是,它内部维护了两个ReentrantLock
,分别用于控制出队与入队,提高了并发性能。同样也维护了两个非空,非满的Condition,使用Condition更加自由的控制唤醒。LinkedBlockingQueue
是有界的队列,如果不指定容量大小,将默认为Integer.MAX_VALUE,如果出队速度跟不上入队速度,那么就有可能会内存溢出。
3. 两者的区别
1、ArrayBlockingQueue
基于数组实现;LinkedBlockingQueue
基于单链表实现。
2、ArrayBlockingQueue
出队入队都用同一把锁,出队入队互斥;LinkedBlockingQueue
出队一把锁,入队一把锁,相互不会干扰,提高了并发性能。
3、ArrayBlockingQueue
必须指定容量;LinkedBlockingQueue
可以指定容量,如果不指定则默认为Integer.MAX_VALUE,有可能会造成内存溢出问题。
4、ArrayBlockingQueue
执行remove时需要将元素移动;LinkedBlockingQueue
执行remove只需找到目标节点,进行删除即可。