概述
阻塞队列解决的问题:在一个容量有限的仓库里面,实现满了就挂起生产线程,空了就挂起消费线程的兼顾性能和安全的数据结构
接口的方法:
- 失败抛出异常: add、remove
- 阻塞等待:put、take
- 快速反馈:offer、pull、offer(timeout)
常用阻塞队列特性总结:通过加锁实现安全地读写(size、contains函数也是加锁读),通过condition或者AtomicInteger来协调生产和消费
- ArrayBlockQueue:1把锁,无法并发写,2个condition,通过await、signal来实现线程调度。FIFO队列的定长阻塞队列
- LinkedBlockQueue:2把锁,支持生产、消费并发执行,但是不支持并发的生产,通过AtomicInteger和CAS来控制库存。可以指定大小,默认无界队列。
- SynchronousQueue:没有容量的队列,put、take成为一对儿才可以执行,否则阻塞,add和remove如果没有配对成功,直接报错。用于快速响应的业务场景
- PriorityBlockingQueue:有优先级的队列,插入元素实现Compare接口,内部维护了一个最小堆,一把锁,基于数组,自动扩容的无界队列
ArrayBlockQueue
Demo
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue queue=new ArrayBlockingQueue(10);
for(int i=0;i<20;i++){
queue.put("obj:"+i); //阻塞
System.out.println("put "+i);
}
}
- 基于定长数组的有界队列,初始化的时候必须制定队列的大小,1个锁,2个condition对象
final Object[] items;
int takeIndex;
int putIndex;
int count;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
- 严格的先进先出队列,基于takeIndex、putIndex、length、count等参数实现FIFO
实现原理:就是一个封装了的一个Lock,2个condition,套路一样:
- 第一步加锁
- 第二步while循环判断是否满足生产条件,不满足就阻塞,满足就生产
- 第三步调用signal()消费线程
- 第四步释放锁
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
- 第一步加锁
lock.lockInterruptibly();
try {
- 第二步while循环判断是否满足生产条件,不满足就阻塞,满足就生产
while (count == items.length){
notFull.await();
}
insert(e);
} finally {
- 第四步释放锁
lock.unlock();
}
}
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
- 第三步调用signal()消费线程
notEmpty.signal();
}
LinkedBlockQueue
- LinkedBlockQueue内部是基于单向链表的队列,可以设置capacity来实现有界队列,也可以不设置,默认是无界队列。无界队列的时候,put、add、offer就是一样的啊,因为没有限制,所以插入肯定成功.
- 内部的实现机制是2把锁,各有一个condition,通过AtomicInteger来指定库存,CAS操作来自增或者自减库存,从而实现线程安全
- 生产线程和消费线程可以并行的进行操作,提升性能
- 生产线程和消费线程的交互和常规的方式不一样,生产线程只在当前库存为0的时候,才会触发 消费线程的signal(),进行了性能优化,因为当库存不为0的时候,不会有消费线程阻塞。
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
private transient Node<E> head;
private transient Node<E> last;
private final int capacity;
private final AtomicInteger count = new AtomicInteger(0);
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
private void enqueue(Node<E> node) {
last = last.next = node;
}
- put的过程
- 对putLock加锁
- while循环判断是否库存满了,满了就阻塞,未满就入队,然后CAS的方式自增并返回老的库存值
- 如果新库存仍然未满,会触发其它的生产线程生产
- 释放putLock锁
- 如果原库存为0,会触发其它的消费线程开始消费
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
SynchronousQueue(同步队列)
- 没有容量的一个特殊队列
- 执行阻塞的方法 put 和 take的时候正常,所有的put方法有一个阻塞的公平队列或者非公平队列,所有的take操作也有一个类似的队列
- 执行非阻塞的方法 add()和remove()方法时,必须有对应的take和put方法阻塞着,不然就会报错
- peek(返回首位但是不删除元素)永远返回null,因为不存储元素
public static void main(String[] args) throws Exception {
final SynchronousQueue<String> queue = new SynchronousQueue<String>();//初始化不能带长度
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
String str = queue.take(); //线程1在获取,这是阻塞的,当线程2一添加,线程1就获取,因为SynchronousQueue是没有容量的
System.out.println("take:"+str);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
Thread t3 = new Thread(new Runnable() {
@Override
public void run() {
try {
queue.put("abcde");
} catch (InterruptedException e) {
}
System.out.println("add:"+"abcde");
}
});
t3.start();
}
PriorityBlockingQueue
- 有优先级但是无界的阻塞队列,类似于List,支持自动扩容,可以指定初始化大小,也可以不指定。实际是一个Arr[]
- 内部有一个最小堆,插入和取出的时候,都要构建堆有序
- add()的时候不会报错,因为容量无限
- 支持元素实现Compare接口,或者PriorityBlockingQueue初始化的时候传入一个Compare接口实现类,两种方式进行比较优先级
PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<Task>(); //因为是无界队列,初始化可以不定义长度
Task t1 = new Task();
t1.setId(1);
t1.setName("任务 1");
Task t2 = new Task();
t2.setId(4);
t2.setName("任务 2");
Task t3 = new Task();
t3.setId(3);
t3.setName("任务 3");
queue.add(t2);
queue.add(t3);
queue.add(t1);
queue.take(); // 取出来最小的 t1 ,任务1
DelayQueue
- 内部有一个PriorityBlockingQueue,最先到期的元素放在堆顶。
- 里面的元素必须要实现Delayed接口
入队等待线程
DelayQueue< Student> students = new DelayQueue<Student>();
students.put(new Student("student"+(i+1), 30+random.nextInt(120),countDownLatch));
出队消费线程
while(!Thread.interrupted()){
students.take().run();
}
参考资料
BlockQueue详细介绍
https://blog.csdn.net/qq_38872310/article/details/80832703