前言
LinkedBlockingQueue
是一个由链表结构组成的有界阻塞队列,按照先进先出的原则对元素进行排序.
本文源码: 源码地址
例子
本文先以一个小例子简单看看
LinkedBlockingQueue
的简单使用. 分别有两个类Producer
负责产生新数据,Consumer
负责消费数据. 例子中有两个消费者和三个生产者,每个生产者生成3
条数据.
package com.linkedblockingqueue;
public class Test01 {
static LinkedBlockingQueue lbq = new LinkedBlockingQueue(5);
public static void main(String[] args) {
Consumer consumer01 = new Consumer("consumer01");
Consumer consumer02 = new Consumer("consumer02");
Producer producer01 = new Producer("producer01");
Producer producer02 = new Producer("producer02");
Producer producer03 = new Producer("producer03");
consumer01.start();
consumer02.start();
producer01.start();
producer02.start();
producer03.start();
}
static class Consumer extends Thread {
Consumer(String name) {super(name);}
public void run() {
try {
while (true) {
System.out.println(Thread.currentThread().getName() + " gets " + lbq.take());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
static class Producer extends Thread {
Producer(String name) {super(name);}
public void run() {
try {
for (int i = 0; i <3; i++) {
lbq.put(Thread.currentThread().getName() + "-" + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
结果如下: 只要不重复消费数据即可.
consumer01 gets producer01-0
consumer02 gets producer01-1
consumer01 gets producer01-2
consumer02 gets producer02-0
consumer01 gets producer02-1
consumer02 gets producer02-2
consumer01 gets producer03-0
consumer02 gets producer03-1
consumer01 gets producer03-2
实现思路
首先想想如果不存在并发的问题,如何用链表实现一个队列呢,很简单,就是维护一个链表, 当要加入数据到队列中时,就生成一个节点往链表尾部插入并更新尾节点即可,当要取数据时从链表头部取数据并更新头节点.
当需要考虑并发的时候, 有几种方式,一种是利用
CAS
,volatile
,比如ConcurrentLinkedQueue
;另一种是加锁,本文中的LinkedBlockingQueue
即采用加锁的方式进行操作的.
LinkedBlockingQueue
采用了ReentrantLock
和Condition
的方式来进行操作的, 如果不了解可以看一下我的另外两篇博客[Java源码][并发J.U.C]---解析Condition 和 [Java源码][并发J.U.C]---用代码一步步实现ReentrantLock.
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
当需要往队列尾部插入数据时需要获得
putLock
锁, 当需要从队列头部获取数据时需要获得takeLock
锁, 需要删除队列中某个元素时需要同时获得putLock
锁和takeLock
锁.
通俗得理解就是操作队列尾部需要putLock
锁,操作队列头部需要takeLock
锁,操作整个队列需要putLock
和takeLock
.
当队列中元素已满时,此时如果插入元素,会调用
notFull.await()
进行等待, 如果队列中元素没满时,需要调用notFull.signal()
给那些之前因为队列满无法插入元素的休眠线程信号. 另外当队列元素由空到有的那个过程中需要调用一次notEmpty.signal()
去给那些因为队列是空没有取得元素导致休眠(因为队列空此时会调用notEmpty.await()
导致休眠)的线程信号.
同样的道理, 在获取元素时, 如果队列为空,会调用
notEmpty.await()
导致该线程休眠,如果获取元素后队列不为空,会调用notEmpty.signal()
去给那些因为队列为空而休眠的线程信号. 另外当队列是从满到不满的过程中需要调用一次notFull.signal()
去给那些因为队列满而无法put,poll
操作的线程信号.
如果有点绕口的话我们就看看源代码会比较帮助理解.
源代码
属性
表示节点的类
Node
.
capacity
: 队列的容量
count
: 当前队列的容量
head
: 链表头节点
last
: 链表尾节点
/**
* 链表的节点
*/
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
/** 容量 如果不指定则为Integer.MAX_VALUE */
private final int capacity;
/** 链表中的元素个数 */
private final AtomicInteger count = new AtomicInteger();
/**
* 链表的头部 head.item 为 null
*/
transient Node<E> head;
/**
* 链表的尾部 last.next 为 null
* Invariant: last.next == null
*/
private transient Node<E> last;
初始化
初始化的时候链表头尾节点相同并且其值为
null
.
/**
* 无参构造函数 capacity默认为Integer.MAX_VALUE
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
* 有参构造函数 并且初始化头节点和尾节点
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity} is not greater
* than zero
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
辅助方法
包括进出队列,同时获得释放两个锁,和给等待的取操作或放操作信号等等.在下面要讨论的插入和获取元素会用到.
/**
* Signals a waiting take. Called only from put/offer (which do not
* otherwise ordinarily lock takeLock.)
* signal一个正在等待的take操作
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
/**
* Signals a waiting put. Called only from take/poll.
* Signal一个等待的put操作
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
/**
* 将node节点入队列
*
* @param node the node
*/
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
/**
* 从链表中返回一个节点 并更新头节点
* @return the node
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
/**
* 同时获得putLock和takeLock
*/
void fullyLock() {
putLock.lock();
takeLock.lock();
}
/**
* 同时释放putLock和takeLock
*/
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
插入元素
put()
方法: 将指定的元素插入此队列的尾部,如果空间已满则等待其空间变为可用.
/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary for space to become available.
*
* 将指定的元素插入此队列的尾部,如果空间已满则等待其变为可用
* 获得锁的过程中可以响应中断
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 如果当前队列容量已满 则该线程释放锁并休眠
while (count.get() == capacity) {
notFull.await();
}
// 往队列尾部插入一个节点
enqueue(node);
// 注意c是先get再increment
c = count.getAndIncrement();
// 增加这个元素后如果队列还没有满则给休眠的线程发信号
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
/**
* 表示唤醒给取操作(take,poll)信号
* 因为现在给空的队列中放入了一个元素
*/
if (c == 0)
signalNotEmpty();
}
从代码中可以看到
put
方法是肯定会成功的,除非获得锁或者在休眠过程中被中断(此时会抛出中断异常).
与
put
对应的还有两个插入元素的方法offer(E e, long timeout, TimeUnit unit) throws InterruptedException
和offer(E e)
.
offer(E e)
方法是在如果队列不为空才插入返回true
,如果为空则立马返回false
,另外它不响应中断.
offer(E e, long timeout, TimeUnit unit) throws InterruptedException
如果队列为空时会等到timeout
后才返回false
,另外它与put
一样在获得锁或者在休眠过程会响应中断.
获取元素
与
put
对应的是take
,与offer
对应的是poll
.
/**
* 如果队列不为空 取队列中的第一个元素
* 否则一直等待
*
* @return 队列中的第一个元素
* @throws InterruptedException 获得锁的过程中线程被中断
*/
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 如果队列为空,则一直等待
while (count.get() == 0) {
notEmpty.await();
}
// 从队列中取进入时间最长的元素
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
/**
* 表示唤醒给放操作(put,offer)信号
* 因为现在给满的队列中消费了一个元素
*/
if (c == capacity)
signalNotFull();
return x;
}
另外注意
dequeue
中删除元素是将头节点的下一个节点的item
先保存好然后将它的item
属性设置为null
,该节点会成为新的头节点,而原先的头节点的next
会指向自己. 这个小细节在Itr
中的nextNode
方法会用到该特性.
对应的
poll
方法和peek
方法就不多说了,
删除元素
/**
* Unlinks interior Node p with predecessor trail.
*/
void unlink(Node<E> p, Node<E> trail) {
// assert isFullyLocked();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
if (count.getAndDecrement() == capacity)
notFull.signal();
}
/**
* 删除元素o
*
* @param o element to be removed from this queue, if present
* @return {@code true} if this queue changed as a result of the call
*/
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();
try {
// p 是要被删除的节点, trail是p节点的前驱节点
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
需要注意两点
1. 在remove
方法中是需要获得两个锁takeLock
和putLock
,因为这是在操作整个链表.
2. 由于是删除元素,可以间接性的认为是在消费队列,所以如果队列是从满到不满的一个过程则需要调用notFull.signal()
方法.(在unlink
方法中)
与此类似的还有
contains(Object o)
和clear()
方法就不多说了.
drainTo
该方法的意思是从队列中消费指定的个数(默认是
Integer.MAX_VALUE
)并加入到指定的容器中.
/**
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}
/**
*
* 将元素都取到c中 n = Math.min(maxElements, 当前队列中的元素个数);
*
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
boolean signalNotFull = false;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
int n = Math.min(maxElements, count.get());
// count.get provides visibility to first n Nodes
Node<E> h = head;
int i = 0;
try {
// 消费n个元素并且加入到容器c中
while (i < n) {
Node<E> p = h.next;
c.add(p.item);
p.item = null;
h.next = h;
h = p;
++i;
}
return n;
} finally {
// Restore invariants even if c.add() threw
if (i > 0) {
// assert h.item == null;
// 判断是否是一个满到不满的过程
head = h;
signalNotFull = (count.getAndAdd(-i) == capacity);
}
}
} finally {
takeLock.unlock();
if (signalNotFull)
signalNotFull();
}
}
遍历元素
Iterator
很常规的方法,由于是操作整个链表,因此需要同时获取的是两个锁.
参考
1.
Java1.8
源码.