简介
PriorityBlockingQueue 是优先级阻塞队列,虽然我们称它为无界,实际上它也是有界的。它跟PriorityQueue 最大的区别在于他是线程安全的,在入队出队时使用同一把锁,在扩容时先解锁,再使用cas原子操作,再重新获取锁。
PriorityBlockingQueue 类
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
只继承AbstractQueue抽象类
PriorityBlockingQueue 属性
// 默认初始长度
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 最大长度
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 元素数组
private transient Object[] queue;
// 元素个数
private transient int size;
// 比较器
private transient Comparator<? super E> comparator;
// 锁
private final ReentrantLock lock;
// 空限制条件
private final Condition notEmpty;
// 自旋扩容锁
private transient volatile int allocationSpinLock;
// 序列化
private PriorityQueue<E> q;
PriorityBlockingQueue 构造函数
无参构造函数
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
带有初始长度构造函数
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
有初始长度和比较器的构造函数
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
// 初始化非公平锁
this.lock = new ReentrantLock();
// 初始化空监控条件
this.notEmpty = lock.newCondition();
// 比较器
this.comparator = comparator;
// 初始化元素数组
this.queue = new Object[initialCapacity];
}
线性集合的构造函数
public PriorityBlockingQueue(Collection<? extends E> c) {
// 初始化非公平锁
this.lock = new ReentrantLock();
// 初始化空监控条件
this.notEmpty = lock.newCondition();
boolean heapify = true; // true if not known to be in heap order
boolean screen = true; // true if must screen for nulls
if (c instanceof SortedSet<?>) {
// 找线性集合比较器
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
heapify = false;
}
else if (c instanceof PriorityBlockingQueue<?>) {
// 原来就是阻塞优先级队列,找原比较器
PriorityBlockingQueue<? extends E> pq =
(PriorityBlockingQueue<? extends E>) c;
this.comparator = (Comparator<? super E>) pq.comparator();
screen = false;
if (pq.getClass() == PriorityBlockingQueue.class) // exact match
heapify = false;
}
// 线性集合转数组
Object[] a = c.toArray();
int n = a.length;
// 兼容老版本BUG
if (a.getClass() != Object[].class)
a = Arrays.copyOf(a, n, Object[].class);
// 查找是否有空元素,有空元素抛异常
if (screen && (n == 1 || this.comparator != null)) {
for (int i = 0; i < n; ++i)
if (a[i] == null)
throw new NullPointerException();
}
// 初始化数组和元素个数
this.queue = a;
this.size = n;
// 数组中建堆
if (heapify)
heapify();
}
PriorityBlockingQueue 添加
优先级队列属于无界队列,忽略满时抛异常
public boolean add(E e) {
return offer(e);
}
添加元素
public boolean offer(E e) {
// 空元素抛异常
if (e == null)
throw new NullPointerException();
// 初始化锁
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
// 达到当前数组长度尝试扩容
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
// 使用自然排序
siftUpComparable(n, e, array);
else
// 使用比较器排序
siftUpUsingComparator(n, e, array, cmp);
// 长度增加
size = n + 1;
// 放开空限制条件
notEmpty.signal();
} finally {
// 解锁
lock.unlock();
}
return true;
}
优先级队列属于无界队列,忽略等待
public void put(E e) {
offer(e); // never need to block
}
优先级队列属于无界队列,忽略超时
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}
PriorityBlockingQueue 扩容
private void tryGrow(Object[] array, int oldCap) {
// 先解锁(后面解释)
lock.unlock();
Object[] newArray = null;
// 使用原则锁CAS
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
// 数组扩容逻辑跟PriorityQueue一样,
// 数组长度小于64时,新长度=原长度*2+2
// 数组长度大于等于64时,新长度=原长度*1.5
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
// 新长度大于默认长度最大值
if (newCap - MAX_ARRAY_SIZE > 0) {
// 使用原长度加1策略
int minCap = oldCap + 1;
// 是否越界,越界抛异常
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
// 不越界就使用数组最大值作为新长度
newCap = MAX_ARRAY_SIZE;
}
// queue == array 必须没有其他线程操作过
if (newCap > oldCap && queue == array)
// 创建新数组
newArray = new Object[newCap];
} finally {
// 重置锁标志
allocationSpinLock = 0;
}
}
// 未拿到cas锁
if (newArray == null)
// 让出当前CPU
Thread.yield();
// 重新获取锁
lock.lock();
// 拷贝数组(queue == array限制没有操作过)
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
为什么要解锁呢?我们先来看看不解锁会怎么样,其他写的线程和读的线程全部都在等待获取锁,然后这边在慢慢拷贝数组。往细了说,扩容应该只影响写,如果有大量的读,其实扩容是可以取消的,有人取过数据(queue == array)将不再成立,这是可以不用扩容了(有空位了)。这里先解锁,在用cas加锁计算新长度创建新数组,然后重新获得锁,完全就是为了性能考虑。
PriorityBlockingQueue 出队
出队
public E poll() {
// 获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 取元素
return dequeue();
} finally {
// 解锁
lock.unlock();
}
}
出队,为空就等待
public E take() throws InterruptedException {
// 获取锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
// 为空等待
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
// 解锁
lock.unlock();
}
return result;
}
出队,为空等待,超时返回false
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 等待时间
long nanos = unit.toNanos(timeout);
// 获取锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
// 为空时等待一段时间,超时在读
while ( (result = dequeue()) == null && nanos > 0)
nanos = notEmpty.awaitNanos(nanos);
} finally {
// 解锁
lock.unlock();
}
return result;
}
真正出队
private E dequeue() {
// 元素个数减1
int n = size - 1;
// 为空返回null
if (n < 0)
return null;
else {
// 获取数组
Object[] array = queue;
// 取头
E result = (E) array[0];
// 取最后一个
E x = (E) array[n];
// 最后一个置空
array[n] = null;
Comparator<? super E> cmp = comparator;
// 用尾替换头并重新平衡
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
PriorityBlockingQueue 查询方法
public E peek() {
// 先获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 数组为空返回null
return (size == 0) ? null : (E) queue[0];
} finally {
// 解锁
lock.unlock();
}
}
PriorityBlockingQueue 获取长度
public int size() {
// 先获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 返回长度
return size;
} finally {
// 解锁
lock.unlock();
}
}