ArrayBlockingQueue
基于数组的有界BlockingQueue。队列的头节点是最早入队的节点
ArrayBlockingQueue核心源码解读
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// 队列中元素数组
final Object[] items;
// 从队列中获取或取走元素的索引位置
int takeIndex;
// 从队列中插入元素的索引位置
int putIndex;
// 队列中的元素个数
int count;
// 队列中的所有操作的锁
final ReentrantLock lock;
// 从队列中取走元素的条件
private final Condition notEmpty;
// 往队列中插入元素的条件
private final Condition notFull;
// i--
final int dec(int i) {
return ((i == 0) ? items.length : i) - 1;
}
// 随机访问元素
@SuppressWarnings("unchecked")
final E itemAt(int i) {
return (E) items[i];
}
// 入队操作,只能在持有锁的情况下可以调用此方法
private void enqueue(E x) {
final Object[] items = this.items;
// 将元素放入队列
items[putIndex] = x;
// putIndex++,如果putIndex等于队列的容量,则重置putIndex = 0的位置
if (++putIndex == items.length)
putIndex = 0;
// 队列中元素个数 + 1
count++;
// 唤醒有出队操作的线程,可以取出元素
notEmpty.signal();
}
// 出队操作,只能在持有锁的情况下可以调用此方法
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
// 将队列中的元素删除
items[takeIndex] = null;
// takeIndex++,如果takeIndex等于队列的容量,则重置takeIndex = 0的位置
if (++takeIndex == items.length)
takeIndex = 0;
// 队列中元素个数 - 1
count--;
if (itrs != null)
itrs.elementDequeued();
// 唤醒有入队操作的线程,可以插入元素了。
notFull.signal();
return x;
}
// 删除指定位置的元素,注意只能在持有锁的操作可以调用此方法
// 此方法效率低,通常不建议使用,因为它需要移动元素
void removeAt(final int removeIndex) {
final Object[] items = this.items;
// 如果要删除的元素在队列的头节点,那么和take、poll或remove方法逻辑一样
if (removeIndex == takeIndex) {
// 删除元素
items[takeIndex] = null;
// 如果takeIndex等队列容量,则将takeIndex重置0
if (++takeIndex == items.length)
takeIndex = 0;
// 队列中的元素个数 - 1
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
// 下次插入队列的索引位置
final int putIndex = this.putIndex;
// 从删除元素的索引位置遍历直到putIndex位置
for (int i = removeIndex;;) {
int next = i + 1;
// 如果到队列的未尾,将索引位置置0
if (next == items.length)
next = 0;
// 如果不等于putIndex,将元素移动到前一个位置
if (next != putIndex) {
items[i] = items[next];
i = next;
} else { // 如果等于putIndex,移动元素完毕
// 将i位置的元素置null
items[i] = null;
// putIndex指向i
this.putIndex = i;
break;
}
}
// 队列中的元素个数 - 1
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}
// 构造指定容量
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
// 构造指定容量,可以设置采用公平或非公平锁的策略
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
// 创建Object数组
this.items = new Object[capacity];
// 创建ReentrantLock对象
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
// 插入元素,如果队列已满,抛异常
public boolean add(E e) {
return super.add(e);
}
// 插入元素,如果队列已满,则返回false
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock(); // 获取锁,此时会阻塞
try {
// 队列已满,直接返回false
if (count == items.length)
return false;
else { // 入队操作
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
// 插入元素,如果队列已满,则进入阻塞状态。支持中断
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果队列已满
while (count == items.length)
notFull.await(); // 线程阻塞
// 入队
enqueue(e);
} finally {
lock.unlock();
}
}
// 插入元素,如果在指定的超时时间内没有插入成功,则返回false
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果队列已满
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos); // 线程阻塞指定的时间
}
// 入队
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
// 从队列中拿走元素,如果队列已空,则返回null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果队列已空返回null,否则出队操作
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
// 从队列中拿走元素,如果队列已空,则该线程阻塞
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 队列为空
while (count == 0)
notEmpty.await(); // 线程阻塞
return dequeue();
} finally {
lock.unlock();
}
}
// 从队列中拿走元素,如果队列已空,则该线程阻塞指定的超时时间,如果在指定时间没有拿到元素,则返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 队列为空
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos); // 线程阻塞指定时间
}
return dequeue();
} finally {
lock.unlock();
}
}
// 从队列头部获取元素,如果为空则返回null
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
// 队列的元素个数
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
// 队列中剩余的容量大小
public int remainingCapacity() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return items.length - count;
} finally {
lock.unlock();
}
}
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
public boolean contains(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i]))
return true;
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
public Object[] toArray() {
Object[] a;
final ReentrantLock lock = this.lock;
lock.lock();
try {
final int count = this.count;
a = new Object[count];
int n = items.length - takeIndex;
if (count <= n)
System.arraycopy(items, takeIndex, a, 0, count);
else {
System.arraycopy(items, takeIndex, a, 0, n);
System.arraycopy(items, 0, a, n, count - n);
}
} finally {
lock.unlock();
}
return a;
}
@SuppressWarnings("unchecked")
public <T> T[] toArray(T[] a) {
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
final int count = this.count;
final int len = a.length;
if (len < count)
a = (T[])java.lang.reflect.Array.newInstance(
a.getClass().getComponentType(), count);
int n = items.length - takeIndex;
if (count <= n)
System.arraycopy(items, takeIndex, a, 0, count);
else {
System.arraycopy(items, takeIndex, a, 0, n);
System.arraycopy(items, 0, a, n, count - n);
}
if (len > count)
a[count] = null;
} finally {
lock.unlock();
}
return a;
}
public String toString() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int k = count;
if (k == 0)
return "[]";
final Object[] items = this.items;
StringBuilder sb = new StringBuilder();
sb.append('[');
for (int i = takeIndex; ; ) {
Object e = items[i];
sb.append(e == this ? "(this Collection)" : e);
if (--k == 0)
return sb.append(']').toString();
sb.append(',').append(' ');
if (++i == items.length)
i = 0;
}
} finally {
lock.unlock();
}
}
public void clear() {
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int k = count;
if (k > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
items[i] = null;
if (++i == items.length)
i = 0;
} while (i != putIndex);
takeIndex = putIndex;
count = 0;
if (itrs != null)
itrs.queueIsEmpty();
for (; k > 0 && lock.hasWaiters(notFull); k--)
notFull.signal();
}
} finally {
lock.unlock();
}
}
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}
public int drainTo(Collection<? super E> c, int maxElements) {
checkNotNull(c);
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = Math.min(maxElements, count);
int take = takeIndex;
int i = 0;
try {
while (i < n) {
@SuppressWarnings("unchecked")
E x = (E) items[take];
c.add(x);
items[take] = null;
if (++take == items.length)
take = 0;
i++;
}
return n;
} finally {
// Restore invariants even if c.add() threw
if (i > 0) {
count -= i;
takeIndex = take;
if (itrs != null) {
if (count == 0)
itrs.queueIsEmpty();
else if (i > take)
itrs.takeIndexWrapped();
}
for (; i > 0 && lock.hasWaiters(notFull); i--)
notFull.signal();
}
}
} finally {
lock.unlock();
}
}
}