LinkedBlockingQueue是用链表实现的FIFO队列,该队列大小默认是Integer.MAX_VALUE,可以认为是无界的,也可指定大小,使其有界。基于链表的队列通常比基于数组的队列有更高的吞吐量,但大多数并发场景下,实际的性能优劣不易测量。
LinkedBlockingQueue通过一个哑铃节点、takeLock和putLock、(AtomicInteger类型的)count,以及级联通知实现了双锁队列算法(的变体),提高了性能。
java.util包下的迭代器是fail-fast的,即当迭代器创建后,若外部通过非迭代器自身的方法修改集合的内容,会抛出ConcurrentModificationException异常;而java.util.concurrent包下的迭代器是弱一致性的,即当迭代器创建后,其他线程通过非迭代器自身的方法修改集合的内容并不会抛出异常。
1. LinkedBlockingQueue继承关系图
2. LinkedBlockingQueue源码分析
2.1 内部类
// 底层单向链表的Node
static class Node<E> {
E item;
// 为了LinkedBlockingQueue.Itr的弱一致性,可能出现next指向当前节点自身的情况
Node<E> next;
Node(E x) { item = x; }
}
2.2 字段
// 队列容量(未指定时默认是Integer.MAX_VALUE)
private final int capacity;
// 队列中元素的个数
// 因为添加、获取(删除)操作使用各自的锁,且两种操作都会修改count
// 的值,因此使用AtomicInteger类型的count来保证原子性(和可见性)
private final AtomicInteger count = new AtomicInteger();
// 底层链表的头节点
transient Node<E> head;
// 底层链表的尾节点
private transient Node<E> last;
// take、poll等方法使用的ReentrantLock和Condition
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
// put、offer等方法使用的ReentrantLock和Condition
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
2.3 三个构造方法
(1)LinkedBlockingQueue(int)
public LinkedBlockingQueue(int capacity) {
// 检查capacity
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// last和head初始时都指向该哑铃节点,正是由于该节点,实现了添加元素和获取(删除)元素
// 两种操作的解耦,使得LinkedBlockingQueue这个基于链表的阻塞队列可以采用双锁队列算法
last = head = new Node<E>(null);
}
(2)LinkedBlockingQueue()
public LinkedBlockingQueue() {
// 调用LinkedBlockingQueue(int)
this(Integer.MAX_VALUE);
}
(3)LinkedBlockingQueue(Collection)
public LinkedBlockingQueue(Collection<? extends E> c) {
// 调用LinkedBlockingQueue(int)
this(Integer.MAX_VALUE);
// 要(在队尾)添加元素,所以使用putLock锁
final ReentrantLock putLock = this.putLock;
// 实例化时不存在多线程竞争,这里上锁是为了可见性
putLock.lock();
try {
int n = 0;
for (E e : c) {
// e不能为null
if (e == null)
throw new NullPointerException();
// 注意n从0开始
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e)); // 入队
++n;
}
// 更新count.value的值
count.set(n);
} finally {
putLock.unlock();
}
}
2.4 辅助方法
(1)enqueue
// 将node添加到队列末尾
private void enqueue(Node<E> node) {
// 该语句等价转化为:
// last.next = node;
// last = last.next;
last = last.next = node;
}
(2)dequeue
// 从头部删除一个节点
private E dequeue() {
// 注意:head指向的是哑铃节点
Node<E> h = head;
Node<E> first = h.next;
// 不直接将h.next置为null是为了LinkedBlockingQueue.Itr的弱一致性
h.next = h; // help GC
head = first;
// 获取first.item作为返回值
E x = first.item;
// 将first.item置为null,作为新的哑铃节点
first.item = null;
return x;
}
(3)signalNotEmpty
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
// 先获取takeLock锁
// notEmpty.signal中会判断当前线程是否持有锁,所以要调用
// notEmpty的signal方法,必须持有takeLock锁,否则会抛出异常
takeLock.lock();
try {
// 调用notEmpty.signal后,会将条件队列中第一个调用了notEmpty.await或awaitNanos(nanos)
// 的线程对应的Node转移到同步队列中,前驱节点调用unpark后才算真正唤醒该线程
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
(4)signalNotFull
// 与signalNotEmpty类似
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
// 先获取putLock锁
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
(5)fullyLock
// 上锁
void fullyLock() {
putLock.lock();
takeLock.lock();
}
(6)fullyUnlock
// 解锁
// 以一种顺序上锁,以相反的顺序释放锁,可避免死锁
// (这里上锁顺序或解锁顺序颠倒也不会出现死锁)
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
2.5 添加元素
(1)put-无限期阻塞版
public void put(E e) throws InterruptedException {
// e不能为null
if (e == null) throw new NullPointerException();
// 约定put、take等方法中将c初始化为-1,表示入队失败
int c = -1;
// 将e封装成Node
Node<E> node = new Node<E>(e);
// 要(在队尾)添加元素,所以使用putLock锁
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 获取count.value的值,若为capacity则阻塞当前线程
// 可能出现当前线程被唤醒后队列大小仍是capacity的情况,当前线程会再次阻塞
while (count.get() == capacity) {
notFull.await();
}
// 入队
enqueue(node);
// 注意:getAndIncrement中会以CAS的方式将count.value加1并返回count.value原来的值
c = count.getAndIncrement();
// 注意c是本次入队前count.value的值
// 这两行语句算是级联通知结构的一部分,若不采用级联通知,即将这两行语句替换为
// signalNotEmpty();则每次入队都要获取takeLock,调用notEmpty.signal,极大降低了性能
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// c为0表示队列中有一个元素
if (c == 0)
// 为了降低添加元素相关操作对takeLock的依赖,采用了级联通知
// 级联通知:
// 只要队列中有一个元素,就通过signalNotEmpty获取takeLock锁,并调用notEmpty.signal
// 将一个消费者线程(若存在的话)加入同步队列(此时可能没有消费者线程,这种情况下,后
// 面过来的消费者线程是不会阻塞的,因为队列中已经有元素了、也可能有多个阻塞着的消费
// 者线程),后面过来的线程和同步队列中的线程竞争锁,竞争到锁的线程会执行出队操作,
// 若队列不为空,该线程会调用notEmpty.signal,如此往复......若队列为空了,消费者线程
// 会阻塞,又会通过这里的signalNotEmpty进行处理。
signalNotEmpty();
}
(2)offer(E)-非阻塞版
public boolean offer(E e) {
// e不能为null(略)
final AtomicInteger count = this.count;
// 先获取count.value的值,尝试判断一下
if (count.get() == capacity)
return false;
int c = -1;
// 与put中相同的代码不再贴出
// 当其他线程在上面if之后又添加元素时再次获取的
// count.value就为capacity,条件为false,c为-1
if (count.get() < capacity) {
// 入队
enqueue(node);
// 再次强调:c是count.value入队前的值
c = count.getAndIncrement();
// 级联通知相关
if (c + 1 < capacity)
notFull.signal();
}
// 若c仍为-1,表示入队失败,返回false,否则返回true
return c >= 0;
}
(3)offer(E,long,TimeUnit)-限期阻塞版
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
// 与put中相同的代码不再贴出
// 转化为纳秒数
long nanos = unit.toNanos(timeout);
while (count.get() == capacity) {
// 队列仍是满的,但阻塞时长已到,在这里跳出
if (nanos <= 0)
return false;
// 阻塞指定时长,也可能被提前唤醒
nanos = notFull.awaitNanos(nanos);
}
return true;
}
2.6 获取(删除)元素
(1)take-阻塞版
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 要(在队t头)删除元素,所以使用takeLock锁
takeLock.lockInterruptibly();
try {
// 队列为空则阻塞
while (count.get() == 0) {
notEmpty.await();
}
// 出队
x = dequeue();
// 强调:c是count.value入队前的值
c = count.getAndDecrement();
// c为1说明经过上面的出队操作队列已空
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// c为capacity说明当前队列中的元素个数是capacity-1
if (c == capacity)
// 为了降低添加元素相关操作对putLock的依赖,采用了级联通知
// 与put的级联通知是相对应的,不再介绍
signalNotFull();
return x;
}
poll()-非阻塞版和poll(long,TimeUnit)-限期阻塞版与offer(E)-非阻塞版和offer(E,long,TimeUnit)-限期阻塞版是相对应的,不再介绍。
(2)remove-直接删除元素
// 注意take、poll等方法是获取元素,remove是直接删除队列中第一个满足o.equals(p.item)的元素
public boolean remove(Object o) {
// 队列中的元素不允许为null,所以这里o也不能为null
if (o == null) return false;
// 因为要操作的是整个队列,所以使用双锁
fullyLock();
try {
// 遍历链表
// trail是p的前一个节点,trail初始为哑铃节点
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
// 找到满足o.equals(p.item)的节点,断开该节点
unlink(p, trail);
// 已移除一个,返回true
return true;
}
}
// 未找到匹配的节点,返回false
return false;
} finally {
// 释放双锁
fullyUnlock();
}
}
void unlink(Node<E> p, Node<E> trail) {
// 为了LinkedBlockingQueue.Itr的弱一致性,不将p.next置为null
p.item = null;
// 进行连接
trail.next = p.next;
// p是否是最后一个节点
if (last == p)
last = trail;
// 若队列中元素个数为capacity-1,则进行级联通知
if (count.getAndDecrement() == capacity)
notFull.signal();
}
2.7 查看元素
public E peek() {
// 条件成立说明队列为空
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
// 要查看对头元素的值,因此使用takeLock
takeLock.lock();
try {
// 获取头节点的下一个节点
// 注意头节点是哑铃节点
Node<E> first = head.next;
// 当在上面if之后,其他线程将队列中的元素取完时,这里first就为null
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
2.8 弱一致性迭代器
下面通过两个例子对LinkedBlockingQueue.Itr.nextNode中的条件进行复现,留作参考。
(1)nextNode中"s == p"为true的例子
public static void main(String[] args) throws InterruptedException {
LinkedBlockingQueue<String> lbq = new LinkedBlockingQueue(3);
lbq.put("a");
lbq.put("b");
lbq.put("c");
Iterator<String> iterator = lbq.iterator();
lbq.take();
lbq.take();
// 此处设断点,进入nextNode方法,会发现s == p成立
iterator.next();
}
(2)nextNode中"s != null && s.item == null"为true的例子
public static void main(String[] args) throws InterruptedException {
LinkedBlockingQueue<String> lbq = new LinkedBlockingQueue(3);
lbq.put("a");
lbq.put("b");
lbq.put("c");
Iterator<String> iterator = lbq.iterator();
lbq.remove("a");
lbq.take();
// 此处设断点调试,进入nextNode方法,会
// 发现s != null && s.item == null成立
iterator.next();
}