数据结构 - PriorityBlockingQueue 优先级阻塞队列

简介

PriorityBlockingQueue 是优先级阻塞队列,虽然我们称它为无界,实际上它也是有界的。它跟PriorityQueue 最大的区别在于他是线程安全的,在入队出队时使用同一把锁,在扩容时先解锁,再使用cas原子操作,再重新获取锁。

PriorityBlockingQueue 类

public class PriorityBlockingQueue<E> extends AbstractQueue<E>

只继承AbstractQueue抽象类

PriorityBlockingQueue 属性

// 默认初始长度
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 最大长度
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 元素数组
private transient Object[] queue;
// 元素个数
private transient int size;
// 比较器
private transient Comparator<? super E> comparator;
// 锁
private final ReentrantLock lock;
// 空限制条件
private final Condition notEmpty;
// 自旋扩容锁
private transient volatile int allocationSpinLock;
// 序列化
private PriorityQueue<E> q;

PriorityBlockingQueue 构造函数

无参构造函数

public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}

带有初始长度构造函数

public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}

有初始长度和比较器的构造函数

public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    // 初始化非公平锁
    this.lock = new ReentrantLock();
    // 初始化空监控条件
    this.notEmpty = lock.newCondition();
    // 比较器
    this.comparator = comparator;
    // 初始化元素数组
    this.queue = new Object[initialCapacity];
}

线性集合的构造函数

public PriorityBlockingQueue(Collection<? extends E> c) {
    // 初始化非公平锁
    this.lock = new ReentrantLock();
    // 初始化空监控条件
    this.notEmpty = lock.newCondition();
    boolean heapify = true; // true if not known to be in heap order
    boolean screen = true;  // true if must screen for nulls
    if (c instanceof SortedSet<?>) {
        // 找线性集合比较器
        SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
        this.comparator = (Comparator<? super E>) ss.comparator();
        heapify = false;
    }
    else if (c instanceof PriorityBlockingQueue<?>) {
        // 原来就是阻塞优先级队列,找原比较器
        PriorityBlockingQueue<? extends E> pq =
                (PriorityBlockingQueue<? extends E>) c;
        this.comparator = (Comparator<? super E>) pq.comparator();
        screen = false;
        if (pq.getClass() == PriorityBlockingQueue.class) // exact match
            heapify = false;
    }
    // 线性集合转数组
    Object[] a = c.toArray();
    int n = a.length;
    // 兼容老版本BUG
    if (a.getClass() != Object[].class)
        a = Arrays.copyOf(a, n, Object[].class);
    // 查找是否有空元素,有空元素抛异常
    if (screen && (n == 1 || this.comparator != null)) {
        for (int i = 0; i < n; ++i)
            if (a[i] == null)
                throw new NullPointerException();
    }
    // 初始化数组和元素个数
    this.queue = a;
    this.size = n;
    // 数组中建堆
    if (heapify)
        heapify();
}

PriorityBlockingQueue 添加

优先级队列属于无界队列,忽略满时抛异常

public boolean add(E e) {
    return offer(e);
}

添加元素

public boolean offer(E e) {
    // 空元素抛异常
    if (e == null)
        throw new NullPointerException();
    // 初始化锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    // 达到当前数组长度尝试扩容
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            // 使用自然排序
            siftUpComparable(n, e, array);
        else
            // 使用比较器排序
            siftUpUsingComparator(n, e, array, cmp);
        // 长度增加
        size = n + 1;
        // 放开空限制条件
        notEmpty.signal();
    } finally {
        // 解锁
        lock.unlock();
    }
    return true;
}

优先级队列属于无界队列,忽略等待

public void put(E e) {
    offer(e); // never need to block
}

优先级队列属于无界队列,忽略超时

public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e);
}

PriorityBlockingQueue 扩容

private void tryGrow(Object[] array, int oldCap) {
    // 先解锁(后面解释)
    lock.unlock();
    Object[] newArray = null;
    // 使用原则锁CAS
    if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                    0, 1)) {
        try {
            // 数组扩容逻辑跟PriorityQueue一样,
            // 数组长度小于64时,新长度=原长度*2+2
            // 数组长度大于等于64时,新长度=原长度*1.5
            int newCap = oldCap + ((oldCap < 64) ?
                    (oldCap + 2) : // grow faster if small
                    (oldCap >> 1));
            // 新长度大于默认长度最大值
            if (newCap - MAX_ARRAY_SIZE > 0) {
                // 使用原长度加1策略
                int minCap = oldCap + 1;
                // 是否越界,越界抛异常
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                // 不越界就使用数组最大值作为新长度
                newCap = MAX_ARRAY_SIZE;
            }
            // queue == array 必须没有其他线程操作过
            if (newCap > oldCap && queue == array)
                // 创建新数组
                newArray = new Object[newCap];
        } finally {
            // 重置锁标志
            allocationSpinLock = 0;
        }
    }
    // 未拿到cas锁
    if (newArray == null)
        // 让出当前CPU
        Thread.yield();
    // 重新获取锁
    lock.lock();
    // 拷贝数组(queue == array限制没有操作过)
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

为什么要解锁呢?我们先来看看不解锁会怎么样,其他写的线程和读的线程全部都在等待获取锁,然后这边在慢慢拷贝数组。往细了说,扩容应该只影响写,如果有大量的读,其实扩容是可以取消的,有人取过数据(queue == array)将不再成立,这是可以不用扩容了(有空位了)。这里先解锁,在用cas加锁计算新长度创建新数组,然后重新获得锁,完全就是为了性能考虑。

PriorityBlockingQueue 出队

出队

public E poll() {
    // 获取锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 取元素
        return dequeue();
    } finally {
        // 解锁
        lock.unlock();
    }
}

出队,为空就等待

public E take() throws InterruptedException {
    // 获取锁
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        // 为空等待
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        // 解锁
        lock.unlock();
    }
    return result;
}

出队,为空等待,超时返回false

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 等待时间
    long nanos = unit.toNanos(timeout);
    // 获取锁
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        // 为空时等待一段时间,超时在读
        while ( (result = dequeue()) == null && nanos > 0)
            nanos = notEmpty.awaitNanos(nanos);
    } finally {
        // 解锁
        lock.unlock();
    }
    return result;
}

真正出队

private E dequeue() {
    // 元素个数减1
    int n = size - 1;
    // 为空返回null
    if (n < 0)
        return null;
    else {
        // 获取数组
        Object[] array = queue;
        // 取头
        E result = (E) array[0];
        // 取最后一个
        E x = (E) array[n];
        // 最后一个置空
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        // 用尾替换头并重新平衡
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        return result;
    }
}

PriorityBlockingQueue 查询方法

public E peek() {
    // 先获取锁    
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 数组为空返回null
        return (size == 0) ? null : (E) queue[0];
    } finally {
        // 解锁
        lock.unlock();
    }
}

PriorityBlockingQueue 获取长度

public int size() {
    // 先获取锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 返回长度
        return size;
    } finally {
        // 解锁
        lock.unlock();
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容