LinkedBlockingQueue
数据结构
单向链表数据结构,代码如下:
static class Node<E> {
// 节点数据
E item;
// 指向下一个节点的引用
Node<E> next;
// Constructor
Node(E x) { item = x; }
}
LinkedBlockingQueue实现原理
是基于单向链表数据结构的BlockingQueue,为了保证线程安全,定义了两把锁(ReentrantLock
):putLock
(入队操作的锁)和takeLock
(出队操作的锁)。还有两个条件变量(Condition):notEmpty
(takeLock
锁所持有)和notFull
(putLock
锁所持有)。还有一个count
属性(AtomicInteger
),表示队列中的元素数量。
元素入队(put
)和出列('take')两个核心操作:
- 当入队时(put方法),首先需要获取
putLock
锁,循环判断当前队列中的元素数量和capacity
是否相等(表示队列已满),如果相等,则调用notFull.await
方法,调用该方法的线程进入阻塞状态并释放putLock
锁。如果队列未满,则创建Node
将该NoDE
指向队列尾部,并将count + 1
。如果当前队列未满则调用notFull.signal
方法,唤醒其它需要入队的线程。当count == 0
时,调用notEmpty.signal
方法,唤醒其它需要出队的线程。 - 当出队时(take方法),首先需要获取
takeLock
锁,循环判断当前队列是否为空,如果为空,则调用notEmpty.await
方法,调用该方法的线程进入阻塞状态并释放takeLock
锁。如果队列还有元素,则出队,并将count - 1
。如果当前队列仍有元素,则调用notEmpty.signal
方法,唤醒其它需要出队的线程。当count == capacity
时,调用notFull.signal
方法,唤醒其它需要入队的线程。
LinkedBlockingQueue核心源码解读
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
// 队列的容量
private final int capacity;
// 当前队列中元素的个数,线程安全的
private final AtomicInteger count = new AtomicInteger();
// 队列的头节点, head.item == null
transient Node<E> head;
// 队列的尾节点,last.next == null
private transient Node<E> last;
// 当调用take、pool等方法时,需要持有的锁(takeLock)
private final ReentrantLock takeLock = new ReentrantLock();
// 从队列删除元素时,所持有锁的条件,既队列非空情况下。
private final Condition notEmpty = takeLock.newCondition();
// 当调用put、offer等方法时,需要持有的锁(putLock)
private final ReentrantLock putLock = new ReentrantLock();
// 当从队列中插入一个元素时,所持有锁的条件,既队列非满情况下。
private final Condition notFull = putLock.newCondition();
// 当有线程调用put/offer方法时,此时队列为非空,需要调用该方法去通知(唤醒)调用了take/poll方法的线程。
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 通知调用了take或poll方法的线程await
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
// 当有线程调用take或poll方法时,此时队列为非满,需要调用该方法去通知(唤醒)调用了put或offer方法的线程。
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 通知调用了take或poll方法
notFull.signal();
} finally {
putLock.unlock();
}
}
// 入队,向队尾中插入元素
private void enqueue(Node<E> node) {
last = last.next = node;
}
// 出队,从队头中删除元素
private E dequeue() {
Node<E> h = head; // head
Node<E> first = h.next; // first为新的head
h.next = h; // help GC h.next 指向 h h节点变成孤儿节点
head = first; // head 指向first
E x = first.item;
first.item = null;
return x;
}
// 获取两把锁
void fullyLock() {
putLock.lock();
takeLock.lock();
}
// 释放两把锁
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
// 默认构造器,创建Integer.MAX_VALUE容量的队列
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
// 指定容量大小的队列
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
// 将指定的集合中的元素插入到队列中,capacity == Integer.MAX_VALUE
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // 永远不会有锁竞争,但是需要保证可见性
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
// 入队
enqueue(new Node<E>(e));
++n;
}
// 将count设置 c.size
count.set(n);
} finally {
putLock.unlock();
}
}
// 返回限列中的元素数量
public int size() {
return count.get();
}
// 队列中剩余的容量大小
public int remainingCapacity() {
return capacity - count.get();
}
// 队列中插入元素,如果队列已满一直阻塞到队列有剩余空间。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1; // 默认状态,表示put失败
Node<E> node = new Node<E>(e); // 创建Node
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 支持中断的获取锁
putLock.lockInterruptibly();
try {
// 注意这里一定用while,而不是if。当队列已满,则该方法进入阻塞状态
while (count.get() == capacity) {
notFull.await();
}
// 队列没满,元素入队
enqueue(node);
// 队列元素个数 + 1
c = count.getAndIncrement();
// 如果当前队列没满,通知其它调用put或offer线程可以插入元素了
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// c == 0 代表队列中已经有一个元素,通知调用take或poll的线程可以拿元素
// TODO c > 0 为什么不通知?
if (c == 0)
signalNotEmpty(); // notEmpty.signal();
}
// 指定的超时时间间隔内在队列尾部插一个元素,如果超时则返回false。阻塞
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
// 超时时间的纳秒数
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 注意这里一定用while,而不是if。当队列已满,则该方法进入阻塞状态
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
// 队列没满,元素入队
enqueue(new Node<E>(e));
// 队列元素个数 + 1
c = count.getAndIncrement();
// 如果当前队列没满,通知其它调用put或offer线程可以插入元素了
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
// 如果队列已满直接返回
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 只有队列未满时才可以将元素入队
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 获取takelock锁
takeLock.lockInterruptibly();
try {
// 如果队列为空,则进入阻塞状态,并释放锁
// 当有其它线程唤醒当前线程,注意还需要循环验证一下队列是否为空,因为可能有其它线程拿走元素使队列又为空
while (count.get() == 0) {
notEmpty.await();
}
// 出队
x = dequeue();
// 计数减一
c = count.getAndDecrement();
// 如果队列个数大于,则唤醒其它调用take或poll操作的线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 如果队列中的元素个数等于capacity,则唤醒调用put或offer的线程。(因为该操作拿走一个元素,此时队列的个数是capacity - 1。
if (c == capacity)
signalNotFull();
return x;
}
// 和take方法逻逻一样,只是调用带超时的await方法,当超过一段时间内没有被唤醒则放弃该操作
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
// 该操作不会被阻塞(也就是叫不会调用await方法)。如果队列为空则返回null
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
// 从队列的头部获取元素,注意它需要获取takeLock锁,如果不加锁,有可能读取null值。因为可能有可能其它线程调用了take方法导致head发生变化。
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
// 获取takeLock,处于阻塞状态
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
void unlink(Node<E> p, Node<E> trail) {
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
if (count.getAndDecrement() == capacity)
notFull.signal();
}
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
public boolean contains(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> p = head.next; p != null; p = p.next)
if (o.equals(p.item))
return true;
return false;
} finally {
fullyUnlock();
}
}
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}
public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
boolean signalNotFull = false;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
int n = Math.min(maxElements, count.get());
// count.get provides visibility to first n Nodes
Node<E> h = head;
int i = 0;
try {
while (i < n) {
Node<E> p = h.next;
c.add(p.item);
p.item = null;
h.next = h;
h = p;
++i;
}
return n;
} finally {
// Restore invariants even if c.add() threw
if (i > 0) {
// assert h.item == null;
head = h;
signalNotFull = (count.getAndAdd(-i) == capacity);
}
}
} finally {
takeLock.unlock();
if (signalNotFull)
signalNotFull();
}
}
}