ArrayBlockingQueue底层是用数组实现的有界(即大小固定)的FIFO队列,其中利用两个字段,将该数组构造成了环形数组。ArrayBlockingQueue体现了生产者-消费者模型,通过ReentrantLock和Condition实现了资源的互斥访问和线程间的通信,保证了线程安全。
1. ArrayBlockingQueue继承关系图
2. ArrayBlockingQueue源码分析
2.1 字段
// 这4个字段无需被volatile修饰,因为锁可以保证它们的可见性
// 底层数组
final Object[] items;
// 利用takeIndex和putIndex将数组构造成了环形数组
// takeIndex表示下次take、poll、peek、remove时元素的索引
// (即环形数组中第一个元素的索引)
int takeIndex;
// putIndex表示下次put、offer、add时元素的索引
// (即环形数组中最后一个元素下一个位置的索引)
int putIndex;
// 队列中元素的个数
int count;
// 锁
final ReentrantLock lock;
// 出队、入队时要用的两个Condition(经典双条件算法)
private final Condition notEmpty;
private final Condition notFull;
2.2 三个构造方法
(1)ArrayBlockingQueue(int,boolean)
public ArrayBlockingQueue(int capacity, boolean fair) {
// 检查容量
if (capacity <= 0)
throw new IllegalArgumentException();
// 创建指定大小的数组
this.items = new Object[capacity];
// fair为true,表示创建公平锁,为false,表示创建非公平锁
// 公平性通常会降低吞吐量,但可以避免饥饿
lock = new ReentrantLock(fair);
// 创建基于ReentrantLock的两个Condition
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
(2)ArrayBlockingQueue(int)
public ArrayBlockingQueue(int capacity) {
// 调用ArrayBlockingQueue(int,boolean)
// 第二个参数为false,表示使用非公平锁
this(capacity, false);
}
(3)ArrayBlockingQueue(int,boolean,Collection)
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
// 调用ArrayBlockingQueue(int,boolean)初始化各个字段
this(capacity, fair);
// 不会出现多个线程同时构造一个ArrayBlockingQueue
// 对象,所以这里加锁是为了保证可见性,而不是互斥性
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = 0;
try {
// 将c中元素添加到items中
for (E e : c) {
checkNotNull(e); // e不能为null
items[i++] = e;
}
// 若c中元素个数超过items的大小,抛出越界异常
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i; // 初始化count
// 更新putIndex,takeIndex默认为0
// 注意putIndex是环形数组中最后一个元素下一个位置
// 的索引,takeIndex是环形数组中第一个元素的索引
putIndex = (i == capacity) ? 0 : i;
} finally {
// 解锁
lock.unlock();
}
}
2.3 辅助方法
(1)enqueue
// 入队
private void enqueue(E x) {
final Object[] items = this.items;
// 将x存至putIndex处
items[putIndex] = x;
// 检查、维护环形数组
if (++putIndex == items.length)
putIndex = 0;
// 更新count
count++;
// 唤醒一个通过notEmpty.await或notEmpty.awaitNanos阻塞着的消费者线程
// 其实调用notEmpty.signal后只是将该消费者线程对应的节点从notEmpty
// 中维护的条件队列转移到了ReentrantLock维护的同步队列中,当某个线
// 程释放锁且唤醒的是该消费者线程时,该消费者线程才算被真正唤醒
notEmpty.signal();
}
(2)dequeue
// 出队
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 从takeIndex处取出元素
E x = (E) items[takeIndex];
items[takeIndex] = null;
// 检查、维护环形数组
if (++takeIndex == items.length)
takeIndex = 0;
count--; // 更新count
// itrs相关(略)
// 唤醒一个通过notFull.await或notFull.awaitNanos阻塞着的生产者线程
notFull.signal();
return x;
}
2.4 添加元素
(1)put-无限期阻塞版
public void put(E e) throws InterruptedException {
checkNotNull(e); // e不能为null
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 会处理中断的锁
try {
//ArrayBlockingQueue的put、take等方法中用while的原因(通过两个例子进行说明):
// 例1:非公平锁、队列容量为2时的情形:
// 假设有三个线程thread0、thread1、thread2,初始时,thread0调用了两次put添加了
// 两个元素,之后thread1获取到锁调用put后会阻塞(因为队列已满),之后thread2获取
// 到锁,调用了take取出一个元素并调用了notFull.signal后释放锁,之后thread0先获
// 取到锁调用了一次put后释放锁,之后thread1才获取到锁,此时队列仍是满的(while
// 中的条件再次成立),因此thread1会继续阻塞,所以必须得用while
// 例2:公平锁、队列容量为2时的情形:
// 将上面用非公平锁的例子改为:thread0第三次put时,thread2还未释放锁且还未调用
// notFull.signal,之后thread0对应的Node先加入同步队列,之后thread2调用notFull.signal
// 将thread1对应的Node加入到同步队列,之后thread2释放锁,先唤醒thread0,被唤醒
// 的thread0添加完元素后释放锁,唤醒thread1,此时队列仍然是满的,while中的条件
// 再次成立,因此thread1会继续阻塞,所以必须得用while
// 队列已满,无法放入新元素
while (count == items.length)
// 阻塞等待被其他消费者线程唤醒
notFull.await();
// 入队
enqueue(e);
} finally {
// 解锁
lock.unlock();
}
}
(2)add-抛异常版
public boolean add(E e) {
// 调用AbstractQueue.add
return super.add(e);
}
// AbstractQueue.add
public boolean add(E e) {
// offer返回true,表示(队列未满)添加成功,
// 返回false,表示(队列已满)添加失败
if (offer(e))
return true;
else // 添加失败,抛出异常
throw new IllegalStateException("Queue full");
}
(3)offer(E)-非阻塞版
public boolean offer(E e) {
checkNotNull(e); // e不能为null
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 队列已满,直接返回false
if (count == items.length)
return false;
else { // 队列未满
// 入队
enqueue(e);
// 入队成功,返回true
return true;
}
} finally {
// 解锁
lock.unlock();
}
}
(4)offer(E,long,TimeUnit)-限期阻塞版
// timeout是时长,TimeUnit是时间单位
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e); // e不能为null
// 转化为纳秒
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 队列已满
while (count == items.length) {
// 第二次到这里说明while中的条件仍不成立,队列
// 仍然是满的,已经达到阻塞时长,直接返回false
if (nanos <= 0)
return false;
// 阻塞指定时长这种情况下if中的条件会不成立
// (也可能被notFull.signal提前唤醒,这种情况下while中的条件会不成立)
// awaitNanos返回的是(粗略计算):
// (阻塞前的系统时间+nanos)-阻塞结束后的系统时间
nanos = notFull.awaitNanos(nanos);
}
// 入队
enqueue(e);
// 入队成功,返回true
return true;
} finally {
// 解锁
lock.unlock();
}
}
2.5 获取(删除)元素
(1)take-无限期阻塞版
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 队列为空
while (count == 0)
// 阻塞等待被其他消生产者线程唤醒
notEmpty.await();
// 出队
return dequeue();
} finally {
// 解锁
lock.unlock();
}
}
(2)poll()-非阻塞版
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 队列为空直接返回null
return (count == 0) ? null : dequeue();
} finally {
// 解锁
lock.unlock();
}
}
(3)poll(long,TimeUnit)-限期阻塞版
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 转化为纳秒
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 与offer(E,long,TimeUnit)是相对的
// 队列为空
while (count == 0) {
if (nanos <= 0)
return null;
// 阻塞指定时长或被notEmpty.signal提前唤醒
nanos = notEmpty.awaitNanos(nanos);
}
// 出队
return dequeue();
} finally {
// 解锁
lock.unlock();
}
}
(4)remove-删除指定元素
// 注意take、poll()、poll(long,TimeUnit)的返回值类型都是E,外部可接收从队列中
// 移除的元素,而remove则是直接删除队列中第一个满足o.equals(items[i])的元素
public boolean remove(Object o) {
// o为null直接返回false,因为队列中允许存在为null的元素
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 因为用的do-while,所以这里必须先判断count是否大于0
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
// 强调:putIndex是环形数组最后一个元素后一个位置的索引
// 删除takeIndex到putIndex-1之间第一个满足o.equals(items[i])的元素
do {
if (o.equals(items[i])) {
// 删除i处的元素,并将i+1到putIndex-1之间的所有元素向前移动
removeAt(i);
// 已经将一个满足的移除,返回true
return true;
}
// 检查、维护环形数组
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
// 不存在满足o.equals(items[i])的元素,返回false
return false;
} finally {
// 解锁
lock.unlock();
}
}
void removeAt(final int removeIndex) {
final Object[] items = this.items;
// 这里算是一个小优化:
// 当removeIndex是takeIndex时,仅将takeIndex加1,否则在else
// 中移动putIndex,将removeIndex ~ putIndex的所有元素向前移动
if (removeIndex == takeIndex) {
items[takeIndex] = null;
// 检查、维护环形数组
if (++takeIndex == items.length)
takeIndex = 0;
count--;
// itrs相关(略)
} else {
final int putIndex = this.putIndex;
// i初始值为removeIndex
for (int i = removeIndex;;) {
int next = i + 1;
// 检查、维护环形数组
if (next == items.length)
next = 0;
// next不为putIndex
if (next != putIndex) {
items[i] = items[next];
// 用next更新i
// 因为是环形数组,所有不能直接i++
i = next;
} else {
items[i] = null;
// 因为是环形数组,所有不能直接this.putIndex++
this.putIndex = i;
break; // 移动完毕,跳出循环
}
}
count--;
// itrs相关(略)
}
// 唤醒一个通过notFull.await或notFull.awaitNanos阻塞着的生产者线程
notFull.signal();
}
2.6 查看元素
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 返回item[takeIndex]
// 当队列为空时,item[takeIndex]为null
return itemAt(takeIndex); // null when queue is empty
} finally {
// 解锁
lock.unlock();
}
}
final E itemAt(int i) {
return (E) items[i];
}