阻塞队列
BlockingQueue
队列主要有两种:FIFO(先进先出)、LIFO(后进先出)。
再多线程环境中,队列很容实现数据共享,我们常用的"生产者"、"消费者"模型就可以通过队列来传递数据达到数据共享。但是现实中,大多数情况都是生产者产生消息的速度和消费的速度是不匹配的,就需要相应的对生产或者消费进行阻塞。当生产的消息积累到一定程度时,就需要对生产者就行阻塞,以便消费者将积累的消息进行消费。在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全。BlockingQueue释放了我们的双手,他让我们不用关系什么时候去阻塞,什么时候去唤醒线程。
抛异常 | 返回false | 阻塞 | 超时,抛异常 | |
---|---|---|---|---|
插入 | add | offer | put | offer(timeout) |
移除 | take | remove | poll(timeout) | |
检查 | contains |
操作方法:
//--------添加 ----------
boolean add(E e); //添加元素,加不了抛异常
boolean offer(E e); //添加元素,加不了返回false
void put(E e) throws InterruptedException; //添加元素,加不了一直阻塞
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException; //添加元素,达到指定时间没有加入抛异常
// -------移除-----------
boolean remove(Object o);
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
E take() throws InterruptedException;
// ------
常见的BlockingQueue
有界性 | 锁 | 数据结构 | |
---|---|---|---|
ArrayBlockingQueue | bounded | 加锁 | ArrayList |
LinkedBlockingQueue | optionally-bounded | 加锁 | LinkedList |
DelayQueue | unbounded | 加锁 | heap |
PriorityBlockingQueue | unbounded | 加锁 | heap |
SynchronousQueue | bounded | 加锁 | 无 |
1. ArrayBlockingQueue
基于数组实现的有界阻塞安全线程队列。
构造函数
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
//初始化重入锁
lock = new ReentrantLock(fair);
//初始化读、写等待队列
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
//初始化构造器
this(capacity, fair);
//获取重入锁
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
//初始化元素中的个数
count = i;
//插入元素的下标索引
putIndex = (i == capacity) ? 0 : i;
} finally {
//释放锁
lock.unlock();
}
}
相关属性
final Object[] items; //存放元素的数组
int takeIndex; //取元素的下标索引
int putIndex; //存元素的下标索引
int count; //数组中的元素个数
final ReentrantLock lock; //数据读取的可重入锁
private final Condition notEmpty; //读等待的队列
private final Condition notFull; //写等待的队列
核心函数
put
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
//如果当前线程没有中断,就将当前线程锁定
lock.lockInterruptibly();
try {
//当前队列已经满了就一直等待
while (count == items.length)
notFull.await();
//插入元素
enqueue(e);
} finally {
//释放锁
lock.unlock();
}
}
take
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
2. LinkedBlockingQueue
基于链表实现的阻塞队列
构造函数
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
相关属性
private final int capacity; //元素个数
private final AtomicInteger count = new AtomicInteger(); //
transient Node<E> head; //头节点
private transient Node<E> last; //尾节点
private final ReentrantLock takeLock = new ReentrantLock(); //读可重入锁
private final Condition notEmpty = takeLock.newCondition(); //读等待队列
private final ReentrantLock putLock = new ReentrantLock(); //写可重入锁
private final Condition notFull = putLock.newCondition(); //写队列
核心函数
put
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
//获取写的可重入锁
final ReentrantLock putLock = this.putLock;
//线程安全的原子操作类
final AtomicInteger count = this.count;
//判断线程是否中断
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
//如果加1后还小于当前容量,则唤醒一个等待的线程
notFull.signal();
} finally {
//释放锁
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
3. DelayQueue
DelayQueue每次都是将元素加入排序队列,以delay/过期时间为排序因素,将快过期的元素放在队首,取数据的时候每次都是先取快过期的元素。
构造方法
public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
相关属性
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>(); //根据队列里某些元素排序的有序队列
private final Condition available = lock.newCondition();
private Thread leader = null;
核心函数
offer
public boolean offer(E e) {
//获取可重入锁
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
try {
//将元素加入优先级队列中
q.offer(e);
//如果当前元素为队首,将leader=null,唤起其他线程
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
//释放锁
lock.unlock();
}
}
take
public E take() throws InterruptedException {
//获取可重入锁
final ReentrantLock lock = this.lock;
//判断当前线程是否中断,没有中断就将当前线程锁定
lock.lockInterruptibly();
try {
//循环执行
for (;;) {
E first = q.peek();
//如果队首为空,阻塞当前线程
if (first == null)
available.await();
else {
//获取当前元素过期时间
long delay = first.getDelay(NANOSECONDS);
//小于等于0 直接弹出
if (delay <= 0)
return q.poll();
//将first 只为null,避免内存泄漏
first = null; // don't retain ref while waiting
if (leader != null)
//阻塞当前线程
available.await();
else {
//将当前线程赋值给leader,然后阻塞delay时间,等待队首元素达到可出队时间
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
//释放leader引用
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//如果leader元素为空,优先级队列不为空唤起其他线程
if (leader == null && q.peek() != null)
available.signal();
//释放锁
lock.unlock();
}
}
4. PriorityBlockingQueue
无界优先队列
构造函数
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
public PriorityBlockingQueue(Collection<? extends E> c) {
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
boolean heapify = true; // true if not known to be in heap order
boolean screen = true; // true if must screen for nulls
if (c instanceof SortedSet<?>) {
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
heapify = false;
}
else if (c instanceof PriorityBlockingQueue<?>) {
PriorityBlockingQueue<? extends E> pq =
(PriorityBlockingQueue<? extends E>) c;
this.comparator = (Comparator<? super E>) pq.comparator();
screen = false;
if (pq.getClass() == PriorityBlockingQueue.class) // exact match
heapify = false;
}
Object[] a = c.toArray();
int n = a.length;
// If c.toArray incorrectly doesn't return Object[], copy it.
if (a.getClass() != Object[].class)
a = Arrays.copyOf(a, n, Object[].class);
if (screen && (n == 1 || this.comparator != null)) {
for (int i = 0; i < n; ++i)
if (a[i] == null)
throw new NullPointerException();
}
this.queue = a;
this.size = n;
if (heapify)
heapify();
}
相关属性
private static final int DEFAULT_INITIAL_CAPACITY = 11; //默认容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //最大容量
private transient Object[] queue; //存放元素数组
private transient int size; //元素个数
private transient Comparator<? super E> comparator; //比较器
private final ReentrantLock lock; //可重入锁
private final Condition notEmpty; //非空条件
private transient volatile int allocationSpinLock; //扩容时,CAS更新这个值谁更新成功谁执行
private PriorityQueue<E> q;//不阻塞的优先队列,用于序列化/反序列化
核心函数
offer
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
//判断是否需要扩容
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
//根据是否有比较器选择不同方法
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
//唤醒notEmpty条件
notEmpty.signal();
} finally {
//释放锁
lock.unlock();
}
return true;
}
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
// 取父节点
int parent = (k - 1) >>> 1;
// 父节点的元素值
Object e = array[parent];
// 如果key大于父节点,堆化结束
if (key.compareTo((T) e) >= 0)
break;
// 否则,交换二者的位置,继续下一轮比较
array[k] = e;
k = parent;
}
// 找到了应该放的位置,放入元素
array[k] = key;
}
take
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 加锁
lock.lockInterruptibly();
E result;
try {
// 队列没有元素,就阻塞在notEmpty条件上
// 出队成功,就跳出这个循环
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
// 解锁
lock.unlock();
}
// 返回出队的元素
return result;
}
private E dequeue() {
// 元素个数减1
int n = size - 1;
if (n < 0)
// 数组元素不足,返回null
return null;
else {
Object[] array = queue;
// 弹出堆顶元素
E result = (E) array[0];
// 把堆尾元素拿到堆顶
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
// 并做自上而下的堆化
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
// 修改size
size = n;
// 返回出队的元素
return result;
}
}
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf
// 只需要遍历到叶子节点就够了
while (k < half) {
// 左子节点
int child = (k << 1) + 1; // assume left child is least
// 左子节点的值
Object c = array[child];
// 右子节点
int right = child + 1;
// 取左右子节点中最小的值
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
// key如果比左右子节点都小,则堆化结束
if (key.compareTo((T) c) <= 0)
break;
// 否则,交换key与左右子节点中最小的节点的位置
array[k] = c;
k = child;
}
// 找到了放元素的位置,放置元素
array[k] = key;
}
}
5. SynchronousQueue
双栈双队列算法,一个写SynchronousQueue需要和一个读SynchronousQueue组队出现
构造方法
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
相关属性
static final int NCPUS = Runtime.getRuntime().availableProcessors();
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
static final int maxUntimedSpins = maxTimedSpins * 16;
static final long spinForTimeoutThreshold = 1000L;
private ReentrantLock qlock;
private WaitQueue waitingProducers;
private WaitQueue waitingConsumers;
核心方法
put
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
E transfer(E e, boolean timed, long nanos) {
/* Basic algorithm is to loop trying to take either of
* two actions:
*
* 1. If queue apparently empty or holding same-mode nodes,
* try to add node to queue of waiters, wait to be
* fulfilled (or cancelled) and return matching item.
*
* 2. If queue apparently contains waiting items, and this
* call is of complementary mode, try to fulfill by CAS'ing
* item field of waiting node and dequeuing it, and then
* returning matching item.
*
* In each case, along the way, check for and try to help
* advance head and tail on behalf of other stalled/slow
* threads.
*
* The loop starts off with a null check guarding against
* seeing uninitialized head or tail values. This never
* happens in current SynchronousQueue, but could if
* callers held non-volatile/final ref to the
* transferer. The check is here anyway because it places
* null checks at top of loop, which is usually faster
* than having them implicitly interspersed.
*/
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value
continue; // spin
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
if (t != tail) // inconsistent read
continue;
if (tn != null) { // lagging tail
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // can't wait
return null;
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s)) // failed to link in
continue;
advanceTail(t, s); // swing tail and wait
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
} else { // complementary-mode
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
take
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}