简单介绍
ArrayBlockingQueue
是基于数组的有界阻塞队列。
-
有界
指它不能够存储无限多数量的元素,在创建ArrayBlockingQueue
时,必须要给它指定一个队列的大小 -
阻塞
指在添加 / 取走元素时,当队列 没有空间 / 为空的时候会阻塞,知道队列有空间 / 有新的元素加入时再继续
源码解读
属性
- 队列集合,是一个数组,用来存放元素
/** The queued items */
final Object[] items;
- 调用
take
,poll
,peek
或者remove
方法所取元素的下标位置
/** items index for next take, poll, peek or remove */
int takeIndex;
- 调用
put
,offer
或者add
方法添加元素时,所添加的位置
/** items index for next put, offer, or add */
int putIndex;
- 队列的所有元素数目
/** Number of elements in the queue */
int count;
- 全局锁,掌管所有访问操作
/** Main lock guarding all access */
final ReentrantLock lock;
- 取元素操作的等待条件,如果队列中没有元素,会调用
notEmpty.await()
方法让当前线程处于等待状态
/** Condition for waiting takes */
private final Condition notEmpty;
- 新增元素操作的等待条件,如果队列中已经满元素,会调用
notFull.await()
方法让当前线程处于等待状态
/** Condition for waiting puts */
private final Condition notFull;
- 这个
Itrs
是迭代器和队列之间数据共享的工具类(这块代码太多了有空再看把 - -||| )
transient Itrs itrs = null;
其他方法
- 参数自减的方法,就是当传入的参数
i
为0
时返回 数组长度减1
,否则返回i
减1
final int dec(int i) {
return ((i == 0) ? items.length : i) - 1;
}
- 返回对应位置上的元素
@SuppressWarnings("unchecked")
final E itemAt(int i) {
return (E) items[i];
}
- 参数
v
的非空判断
private static void checkNotNull(Object v) {
if (v == null)
throw new NullPointerException();
}
核心方法
向队列中添加元素
add(e)
、offer(e)
和 put(e)
都是添加元素的方法, add(e)
和 offer(e)
是无阻塞的添加, put(e)
是阻塞添加
-
add(e)
方法,实际上调用了offer(e)
方法
public boolean add(E e) {
return super.add(e);
}
-
offe(e)
方法,将元素添加到BlockingQueue
里,如果可以容纳返回true
否则返回false
public boolean offer(E e) {
checkNotNull(e); // 检查元素是否为 null
final ReentrantLock lock = this.lock;
lock.lock(); // 加锁
try {
if (count == items.length) // 如果队列已经满了返回 false
return false;
else { // 队列还没有满,则添加到队列中
enqueue(e); // 进队
return true;
}
} finally {
lock.unlock(); // 释放锁
}
}
-
put(e)
方法,将元素添加到BlockingQueue
里,如果BlockQueue
没有空间,则调用此方法的线程被阻塞,直到BlockingQueue
里面有空间
public void put(E e) throws InterruptedException {
checkNotNull(e); // 检查元素是否为 null
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 加锁
try {
while (count == items.length)
notFull.await(); //如果队列已经满了,就阻塞(添加到 notFull 条件队列中等待唤醒)
enqueue(e); // 如果队列没有满直接添加
} finally {
lock.unlock(); // 释放锁
}
}
-
enqueue
进队操作
private void enqueue(E x) {
// 获取当前数组
final Object[] items = this.items;
// 通过索引赋值
items[putIndex] = x;
// 如果当前添加对象的位置 +1 等于 数组的长度,也就是当前对象的位置在数组的最后一个
// 那么下一个应该从数组的第一个添加
if (++putIndex == items.length)
putIndex = 0;
count++;
// 唤醒正在等待获取对象的线程
notEmpty.signal();
}
- 除了以上三种常用的添加方法之外,还有个带超时时间的添加方法
offer(e, timeout, unit)
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
从队列中取元素
-
poll()
方法,取队头(首个)元素并删除,没有则返回null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果队列中有元素,则执行 dequeue 操作,否则返回 null
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
-
take()
方法,如果队列中有元素,则获取并删除,如果没有元素,则阻塞等待
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); // 如果队列中没有元素,则添加到 notEmpty 条件队列中等待
return dequeue();
} finally {
lock.unlock();
}
}
-
poll(timeout, unit)
带阻塞超时的取首个元素的方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
-
peek()
方法,只取不删,当队列中没有元素时会返回null
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex);
} finally {
lock.unlock();
}
}
-
dequeue
出队操作
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 根据索引获取对象
E x = (E) items[takeIndex];
// 当前位置的对象被取走,位置就腾出来了
items[takeIndex] = null;
// 如果被取走的是数组的最后一个,那下一个要从第一个取
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 唤醒正在等待添加对象的线程
notFull.signal();
return x;
}
删除队列中某个元素
-
remove(o)
方法,如果队列为空或者没有找到该元素返回false
,否则删除元素并且返回true
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex; // 从取得位置开始,到添加的位置
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length) // 若果判断的位置到了队列最末尾,那下一个从第一个判断
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
-
removeAt(index)
方法,删除指定位置上的元素
void removeAt(final int removeIndex) {
final Object[] items = this.items;
if (removeIndex == takeIndex) { // 当删除的元素是下次取操作要取到的元素时,既队头元素
items[takeIndex] = null; // 删除队头元素,并且 takeIndex 加 1
if (++takeIndex == items.length) // 如果删除得是数组最后一个元素,则 takeIndex 从数组第一个元素开始
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
// 如果删除的不是队头元素
// 则从删除元素的后面一直到添加元素的位置(removeIndex ~ putIndex)期间的元素都要往前挪一个位置
// 取 putIndex 作为循环结束判断条件
final int putIndex = this.putIndex;
for (int i = removeIndex;;) { // 顺序往前挪一个位置
int next = i + 1;
if (next == items.length)// 当循环到数组最后一个元素,下一个元素应该是数组第一个元素
next = 0;
if (next != putIndex) { // 如果查找的索引不等于要添加元素的索引,说明元素可以再移动
items[i] = items[next];
i = next;
} else { // 在 removeIndex 索引之后的元素都往前移动完毕后清空最后一个元素
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}
啊,累了,不写了