一.层次图
-
7个阻塞队列:
ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
DelayQueue:一个使用优先级队列实现的无界阻塞队列。
SynchronousQueue:一个不存储元素的阻塞队列。
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
二.BlockingQueue接口
BlockingQueue是一个支持两个附加操作的队列:
- 这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。
- 当队列满时,存储元素的线程会等待队列可用。
- 阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
阻塞队列提供了四种处理方法:
三.ArrayBlockingQueue 实现类
- 一个由数组支持的有界阻塞队列,这是一个典型的“有界缓存区”.一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。
- 通过将公平性 (fairness) 设置为 true 而构造的队列允许按照 FIFO 顺序访问线程。公平性通常会降低吞吐量,但也减少了可变性和避免了“不平衡性”。默认不支持。
- 结构
//容器
final Object[] items;
//锁
final ReentrantLock lock;
//通知队列
private final Condition notEmpty;
private final Condition notFull;
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();//返回一个AQS的ConditionObject
notFull = lock.newCondition();
}
- 公平模式时当容器满时如何阻塞
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
//如果线程已被中断则直接抛出异常,否尝试获取锁,若获取锁失败,先将当前线程加入
AQS队列,之后获取队列中的Node节点,只有pre节点为head节点才获取再次尝试获取锁,
若此时获取锁失败或pre节点不为head节点时,让当前线程阻塞。(详细请看AQS章节)
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();//若容器满,则将当前线程加入通知队列(详细请看AQS章节)
//若容器不满,则将值插入数组
insert(e);
} finally {
lock.unlock();//释放当前线程
}
}
四.LinkedBlockingQueue实现类
- 一个基于已链接节点的、范围任意的 blockingqueue。此队列按 FIFO(先进先出)排序元素,链接队列的吞吐量通常要高于基于数组的队列,如果未指定容量,则它等于 Integer.MAX_VALUE。
- 结构
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
五.PriorityBlockingQueue
- 一个支持优先级的无界队列。默认情况下元素采取自然顺序排列,元素按照升序排列;也可以通过比较器comparator来指定元素的排序规则。
// 默认容量
private static final int DEFAULT_INITIAL_CAPACITY = 11;
//优先级队列,表示一个平衡二叉树堆:
private transient Object[] queue;
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
- 无界队列是如何扩容的
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
//如果队列已满,释放锁,获取扩容锁,成功则扩容,扩容后则重新获取锁,将原始队列拷贝的新的队列中。
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();
} finally {
//释放锁
lock.unlock();
}
return true;
}