ArrayBlockingQueue是一个有界阻塞队列,线程安全的,通过可重入锁+两个基于状态的锁条件队列保证,其内部是通过数组的方式存储数据的。该队列主要属性有:
// 数据存储
final Object[] items;
/** items index for next take, poll, peek or remove */ // 读位置索引
int takeIndex;
/** items index for next put, offer, or add */// 写位置索引
int putIndex;
/** Number of elements in the queue */
// 队列中的元素个数
int count;
// 保障多线程环境下读写该队列的线程安全性
final ReentrantLocklock;
// 当获取数据的消费者线程被阻塞时,会将该线程放置到notEmpty等待队列中
private final Condition notEmpty;
// 当插入数据的生产者线程被阻塞时,会将该线程放置到notFull等待队列中
private final Condition notFull;
该队列的构造函数为:
public ArrayBlockingQueue(int capacity,boolean fair) {
if (capacity <=0)
throw new IllegalArgumentException();
this.items =new Object[capacity]; // 数组初始化
lock =new ReentrantLock(fair); // 锁初始化
notEmpty =lock.newCondition(); // 读等待
notFull =lock.newCondition(); // 写等待
}
1.1)向队列增加数据之put方法(当队列已满,则阻塞等待)
public void put(E e)throws InterruptedException {
checkNotNull(e); // 验证数据是否为空,ArrayBlockingQueue队列不允许插入空数据
final ReentrantLock lock =this.lock;
lock.lockInterruptibly();
try {
while (count ==items.length)
notFull.await(); // 如果队列已经满了,则将线程放到notFull等待队列中
enqueue(e); // 否则,向队列中加入数据
}finally {
lock.unlock();
}
}
put方法中向队列插入数据方法enqueue
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items =this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex =0; // 数组数据已满,写索引移动到第一个位置
count++; // 更新队列元素个数
notEmpty.signal(); // 通知有数据可消费
}
1.2)向队列增加数据之add方法(当队列已满时,则抛throw new IllegalStateException("Queue full");)
ArrayBlockQueue并没有重新实现add方法,而是直接调用父类AbstractQueue的add方法,即super.add()
当可以插入数据时,调用的是ArrayBlockQueue的offer方法
2.3)向队列增加数据之offer方法(当队列已满时,则直接返回false,不会阻塞当前线程执行)
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock =this.lock;
lock.lock();
try {
if (count ==items.length)
return false; // 队列已满,返回false
else {
enqueue(e);
return true;
}
}finally {
lock.unlock();
}
}
2.1)从队列取数据方法之take方法
public E take()throws InterruptedException {
final ReentrantLock lock =this.lock;
lock.lockInterruptibly();
try {
while (count ==0)
notEmpty.await(); // 如果队列无元素,则将线程放入notEmpty等待队列
return dequeue();
}finally {
lock.unlock();
}
}
take方法中dequeue方法
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items =this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] =null;
if (++takeIndex == items.length)
takeIndex =0;
count--;
/** * itrs定义:内部类Itrs对象,迭代器集合
Shared state for currently active iterators, or null if there are known not to be any. Allows queue operations to update iterator state.
*/
if (itrs !=null)
itrs.elementDequeued();
notFull.signal();
return x;
}
可以看出put和take方法主要是通过condition的通知机制来完成可阻塞式的插入数据和获取数据。关于Condition机制可参考另一篇文章:Java--Lock&Condition的理解