本文作者:王一飞,叩丁狼高级讲师。原创文章,转载请注明出处。
概念
ArrayBlockingQueue 是一个有界阻塞的队列。有界原因是它底层维护了一个数组,初始化时,可以直接指定。要注意,一旦创建成功后,数组将无法进行再扩容。而阻塞是因为它对入列出列做了加锁处理,如果队列满了,再入列则需要阻塞等待, 如果队列是空的,出列时也需要阻塞等待。
ArrayBlockingQueue 底层是一个有界数组,遵循FIFO原则,对进入的元素进行排序,先进先出。
ArrayBlockingQueue 使用ReentrantLock锁,再配合两种Condition实现队列的线程安全操作。并发环境下ArrayBlockingQueue 使用频率较高
ArrayBlockingQueue 支持公平与非公平2种操作策略,在创建对象时通过构造函数将fair参数设置为true/false即可,需要注意的是,如果fair设置为false,表示持有公平锁,这种操作会降低系统吞吐量,慎用。
内部结构
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
final Object[] items; //存放元素数组
final ReentrantLock lock; //互斥锁对象
private final Condition notEmpty; //非空条件变量
private final Condition notFull; //非满条件变量
....
}
从内部结构源码上看,ArrayBlockingQueue 内部维护一个final数组,当队列初始化后将无法再进行拓展,保证队列的有界性。lock 互斥锁,在出队入队中保证线程的安全。而notEmpty 跟 notFull 条件变量保证队列在满队时入队等待, 当队列空列时,出队等待。
初始化
//参数1:队列初始长度
//参数2:是否为公平队列 fasle: 是, true 不是
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
//参数3:队列初始化元素
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
.....
}
ArrayBlockingQueue 有3个构造器,核心是2个参数的构造器, capacity表示队列初始化长度, fair 指定ArrayBlockingQueue是公平队列还是非公平队列。
入列
ArrayBlockingQueue 入列方式有大体三种:
public class App {
public static void main(String[] args) throws InterruptedException {
//队列长度为2
ArrayBlockingQueue queue = new ArrayBlockingQueue(2);
//方式1:满列抛异常
///System.out.println(queue.add("add")); //true
///System.out.println(queue.add("add")); //true
///System.out.println(queue.add("add")); //满列异常
//方式2:满列返回false,不阻塞
//System.out.println(queue.offer("offer")); //true
//System.out.println(queue.offer("offer")); //true
//System.out.println(queue.offer("offer")); //false
//方式3:满列阻塞(推荐)
queue.put("put");
queue.put("put");
queue.put("put"); //满列阻塞等待
}
}
这里我们以put方法为例
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //取锁: 线程运行中断
try {
while (count == items.length)
notFull.await(); //队列满队,需要暂停等待
enqueue(e); //入列
} finally {
lock.unlock(); //释放锁
}
}
在put方法开始前, 先获取可中断lock.lockInterruptibly(), 对put核心逻辑进行加锁,当判断到队列已满,阻塞当前线程。反之, 执行enqueue()实现入列逻辑。
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x; //入列
//putIndex 表示下一个入列所以, 如果为队列长度, 下一个轮回
//原因: 队列为数组, 操作所以从0开始
if (++putIndex == items.length)
putIndex = 0;
count++; //总数+1
notEmpty.signal(); //唤醒等待出列线程
}
进入enqueue之后, 因为该方法已经持有锁,所以无法再进行锁重入,在enqueue方法之后, 执行notEmpty.signal(); 唤醒出列等待线程。
出列
ArrayBlockingQueue 出列也对应的有3中方式
public class App {
public static void main(String[] args) throws InterruptedException {
//队列长度为2
ArrayBlockingQueue queue = new ArrayBlockingQueue(2);
queue.put("admin");
queue.put("admin");
//方式1:空列出队时,抛异常
//System.out.println(queue.remove());
//System.out.println(queue.remove());
//System.out.println(queue.remove()); //空列报异常
//方式2:空列出队时,返回null
System.out.println(queue.poll());
System.out.println(queue.poll());
System.out.println(queue.poll()); //空列返回null
//方式3:空列出队时,阻塞(推荐)
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take()); //空列阻塞
}
}
这里我们以take方法为例
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); // 队长为0,需要暂停等待
return dequeue();
} finally {
lock.unlock();
}
}
跟put方法操作一样, 进入方法之后, 先获取锁,再判断队列长度是否为0, 如果为0, 当前线程进入阻塞。反之,进入dequeue 方法执行出列操作。
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null; //出列之后,原先队列设置为null
//takeIndex 下一个出列的数据索引, 一个轮回后,设置为0
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); //唤醒等待入列线程
return x;
}
公平/非公平队列
ArrayBlockingQueue 可以实现公平与非公平2种队列, 公平队列表示在并发环境下,如果队列已经满列了,入列线程按照FIFO的顺序阻塞,等待召唤。非公平队列就没有这种规矩,谁先抢到,谁先入列。
来看一下例子:
需求:开启10个线程往边界为3的队列添加数据, 同时开始一个线程不断出列。
public class App {
public static void main(String[] args) throws InterruptedException {
//队列长度为3
//公平队列
ArrayBlockingQueue queue = new ArrayBlockingQueue(3, true);
for (int i= 0; i < 10; i++){
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep((long) (Math.random() * 10)); //将问题放大
//线程进入
System.out.println("进入-"+ Thread.currentThread().getName());
//阻塞等待入列
queue.put("出列-" + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"t_" + i).start();
}
new Thread(new Runnable() {
@Override
public void run() {
while(true){
try {
//按顺序出列
System.out.println("------" + queue.take());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}).start();
}
}
进入-t_5
进入-t_1
------出列-t_5
------出列-t_1
进入-t_8
------出列-t_8
进入-t_7
------出列-t_7
进入-t_2
------出列-t_2
进入-t_9
------出列-t_9
进入-t_0
------出列-t_0
进入-t_3
进入-t_6
------出列-t_3
------出列-t_6
进入-t_4
------出列-t_4
观察结果,发现进入顺序跟出列顺序一样。公平队列讲究公平, 进入0到9线程启动后,执行run方法,都能执行 “进入” 代码,但是入列的操作是阻塞的,同一时间点只允许一个线程进入。其他线程必须等待,那么谁先打印 “进入” 代码,就表示谁先阻塞,依照公平FIFO原则,就应该谁先出列。 所以当进入顺序与出列一致就把表示公平原则生效。
将参数改为false,我们再看打印结果
ArrayBlockingQueue queue = new ArrayBlockingQueue(3, false);
进入-t_3
进入-t_7
进入-t_6
进入-t_0
进入-t_4
进入-t_8
进入-t_5
进入-t_1
------出列-t_3
------出列-t_7
------出列-t_6
------出列-t_8
------出列-t_4
------出列-t_5
------出列-t_1
------出列-t_0
进入-t_9
------出列-t_9
进入-t_2
------出列-t_2
观察, 很明显进入与出列顺序不一致,这就是非公平队列。
注意: 10个线程效果不是太明显,可以适当加大。
到这,本篇结束。
想获取更多技术视频,请前往叩丁狼官网:http://www.wolfcode.cn/all_article.html