Java集合队列之ArrayBlockingQueue源码解析

ArrayBlockingQueue是一个有界阻塞队列,线程安全的,通过可重入锁+两个基于状态的锁条件队列保证,其内部是通过数组的方式存储数据的。该队列主要属性有:

// 数据存储 

final Object[]   items;      

/** items index for next take, poll, peek or remove */ // 读位置索引

int takeIndex;                   

/** items index for next put, offer, or add */// 写位置索引

int putIndex;                    

/** Number of elements in the queue */

  // 队列中的元素个数

int count;                      

  // 保障多线程环境下读写该队列的线程安全性 

final ReentrantLocklock;     

// 当获取数据的消费者线程被阻塞时,会将该线程放置到notEmpty等待队列中

private final Condition notEmpty;     

// 当插入数据的生产者线程被阻塞时,会将该线程放置到notFull等待队列中

private final Condition notFull;

该队列的构造函数为

public ArrayBlockingQueue(int capacity,boolean fair) {

    if (capacity <=0)

         throw new IllegalArgumentException();

    this.items =new Object[capacity];    // 数组初始化

    lock =new ReentrantLock(fair);        // 锁初始化

    notEmpty =lock.newCondition();      // 读等待

    notFull =lock.newCondition();           // 写等待

}

1.1)向队列增加数据之put方法(当队列已满,则阻塞等待)

public void put(E e)throws InterruptedException {

checkNotNull(e);    // 验证数据是否为空,ArrayBlockingQueue队列不允许插入空数据

final ReentrantLock lock =this.lock;

lock.lockInterruptibly();

try {

while (count ==items.length)

     notFull.await();    // 如果队列已经满了,则将线程放到notFull等待队列中

enqueue(e);            // 否则,向队列中加入数据

}finally {

lock.unlock();

}

}

put方法中向队列插入数据方法enqueue

private void enqueue(E x) {

// assert lock.getHoldCount() == 1;

// assert items[putIndex] == null;

    final Object[] items =this.items;

    items[putIndex] = x;

if (++putIndex == items.length)

     putIndex =0;      // 数组数据已满,写索引移动到第一个位置

count++;                // 更新队列元素个数

notEmpty.signal();    // 通知有数据可消费

}

1.2)向队列增加数据之add方法(当队列已满时,则抛throw new IllegalStateException("Queue full");)

ArrayBlockQueue并没有重新实现add方法,而是直接调用父类AbstractQueue的add方法,即super.add()

当可以插入数据时,调用的是ArrayBlockQueue的offer方法

2.3)向队列增加数据之offer方法(当队列已满时,则直接返回false,不会阻塞当前线程执行)

public boolean offer(E e) {

checkNotNull(e);

final ReentrantLock lock =this.lock;

lock.lock();

try {

if (count ==items.length)

    return false;    // 队列已满,返回false

else {

    enqueue(e);

    return true;

}

}finally {

    lock.unlock();

}

}

2.1)从队列取数据方法之take方法

public E take()throws InterruptedException {

final ReentrantLock lock =this.lock;

lock.lockInterruptibly();

try {

while (count ==0)

    notEmpty.await();    // 如果队列无元素,则将线程放入notEmpty等待队列

return dequeue();

}finally {

lock.unlock();

}

}

take方法中dequeue方法

private E dequeue() {

// assert lock.getHoldCount() == 1;

// assert items[takeIndex] != null;

    final Object[] items =this.items;

@SuppressWarnings("unchecked")

E x = (E) items[takeIndex];

items[takeIndex] =null;

if (++takeIndex == items.length)

takeIndex =0;

count--;

/** * itrs定义:内部类Itrs对象,迭代器集合

Shared state for currently active iterators, or null if there  are known not to be any. Allows queue operations to update iterator state. 

*/ 

if (itrs !=null)

     itrs.elementDequeued();

notFull.signal();

return x;

}

可以看出put和take方法主要是通过condition的通知机制来完成可阻塞式的插入数据和获取数据。关于Condition机制可参考另一篇文章:Java--Lock&Condition的理解

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容