BlockingQueue详解

在Java中,BlockingQueue是一个接口,它的实现类有 ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、 LinkedBlockingQueue、PriorityBlockingQueue、 SynchronousQueue等,它们的区别主要体现在存储结构上或对元素 操作上的不同,但是对于take与put操作的原理,却是类似的。

阻塞与非阻塞

  • 入队

    offer(E e):如果队列没满,立即返回true;如果队列满了,立即返回 false --> 不阻塞

    put(E e):如果队列满了,一直阻塞,直到队列不满了或者线程被中 断 --> 阻塞

  • 出队

    poll():如果没有元素,直接返回null;如果有元素,出队 --> 不阻塞

    take():如果队列空了,一直阻塞,直到队列不为空或者线程被中断 --> 阻塞

一、LinkedBlockingQueue

LinkedBlockingQueue可以指定容量,如果在初始化时没有指定容量,那么默认使用int的最大值作为队列容量。内部维持一个队列,所以有一个头节点head和一个尾节点last,内部维持两把锁,一个用于入队,一个用于出队,还有锁关联的Condition对象。

1、底层数据结构

LinkedBlockingQueue内部是使用链表实现一个队列的,但是有别于一般的队列,在于该队列至少是有一个节点的,头节点不含有元素。如果队列为空时,头节点的next参数为null。尾节点的next参数也为null。

2、主要变量

// 容量限制,如果没有指定,则为 Integer.MAX_VALUE
private final int capacity;

// 当前队列中的元素数量
private final AtomicInteger count = new AtomicInteger();

// 队列头节点,始终满足head.item == null
transient Node<E> head;

// 队列的尾节点,始终满足last.next == null
private transient Node<E> last;

// 由 take、poll 等持有的锁
private final ReentrantLock takeLock = new ReentrantLock();

// 当队列为空时,保存执行出队的线程
private final Condition notEmpty = takeLock.newCondition();

// 由 put、offer 等持有的锁
private final ReentrantLock putLock = new ReentrantLock();

// 当队列满时,保存执行入队的线程
private final Condition notFull = putLock.newCondition();

3、put(E e)方法

put(E e)方法用于将一个元素插入到队列的尾部。在队列满时会阻塞,直到队列中有元素被取出。

// 在此队列的尾部插入指定元素,如有必要,等待空间可用。
public void put(E e) throws InterruptedException {
    // 不允许元素为null
    if (e == null) throw new NullPointerException();
    
    int c = -1;
    // 以当前元素新建一个节点
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 获得入队的锁
    putLock.lockInterruptibly();
    try {
        // 如果队列已满,那么将该线程加入到Condition的等待队列中
        while (count.get() == capacity) {
            notFull.await();
        }
        // 当队列未满,然后有出队线程取出导致,将节点入队
        enqueue(node);
        // 得到插入之前队列的元素个数。getAndIncrement返回的是 +1 前的值
        c = count.getAndIncrement();
        // 如果还可以插入元素,那么释放等待的入队线程
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        // 释放入队的锁
        putLock.unlock();
    }
    // 如果插入队列之前元素个数为0,插入后就通知出队线程队列非空
    if (c == 0)
        signalNotEmpty();
}

put方法总结:

1、LinkedBlockingQueue不允许插入的元素为null;

2、同一时刻只有一个线程可以进行入队操作,putLock在将元素插入队列尾部时加锁了;

3、如果队列满了,则会调用notFull.await(),将该线程加入到Condition等待队列中。await方法会释放线程占有的锁,这将导致之前由于被阻塞的入队线程将会获取到锁,执行到while循环处,不过可能因为队列仍旧是满的,也被进入到条件队列中;

4、一旦有出队线程取走元素,就会通知到入队等待队列释放线程。那么第一个加入到Condition队列中的将会被释放,那么该线程将会重新获得put锁,继而执行enqueue()方法,将节点插入到队列的尾部;

5、然后得到插入队列前元素的个数,如果插入后队列中还可以继续插入元素,那么就通知notFull条件的等待队列中的线程;

6、如果插入队列前个数为0,那现在插入后,就为1了,那就可以通知因为队列为空而导致阻塞的出队线程去取元素了。

4、E take()方法

take()方法用于得到队头的元素,在队列为空时会阻塞,直到队列中有元素可取。

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 获取takeLock锁
    takeLock.lockInterruptibly();
    try {
        // 如果队列中元素数量为0,则将出队线程加入到notEmpty队列中进行等待
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 得到到队头的元素
        x = dequeue();
        // 得到出队列前元素的个数。getAndDecrement返回的是 -1 前的值
        c = count.getAndDecrement();
        // 如果出队列前的元素数量大于1,那说明还可以继续取,那就释放在notEmpty队列的第一个线程
        if (c > 1)
            notEmpty.signal();
    } finally {
        // 释放出队锁
        takeLock.unlock();
    }
    // 如果出队前队列是满的,那现在取走一个元素了,队列就不满了,就可以去通知等待中的入队线程了。
    if (c == capacity)
        signalNotFull();
    return x;
}

take方法总结:

1、同一时刻只有一个线程可以进行出队操作,takeLock在出队之前加锁了;

2、如果队列中元素为空,那就进入notEmpty队列中进行等待。直到队列不为空时,得到队列中的第一个元素。当发现取完发现还有元素可取时,再通知一下notEmpty队列中等待的其他线程。最后判断自己取元素前的是不是满的,如果是满的,那自己取完,就不满了,就可以通知在notFull队列中等待插入的线程进行put了。

5、remove()方法

remove()方法用于删除队列中一个元素,如果队列中不含有该元素,那么返回 false ;有的话则删除并返回true。入队和出队都是只获取一个锁,而 remove()方法需要同时获得两把锁,

public boolean remove(Object o) {
    // 因为队列不包含null元素,返回false
    if (o == null) return false;
    // 获取两把锁
    fullyLock();
    try {
        // 从头的下一个节点开始遍历
        for (Node<E> trail = head, p = trail.next;
             p != null;
             trail = p, p = p.next) {
            // 如果匹配,那么将节点从队列中移除,trail表示需要删除节点的前一节点
            if (o.equals(p.item)) {
                unlink(p, trail);
                return true;
            }
        }
        return false;
    } finally {
        // 释放两把锁
        fullyUnlock();
    }
}
/**
 * 锁定以防止 put 和 take.
 */
void fullyLock() {
    putLock.lock();
    takeLock.lock();
}

/**
 * 解锁以允许 put 和 take.
 */
void fullyUnlock() {
    takeLock.unlock();
    putLock.unlock();
}

6、总结

LinkedBlockingQueue允许两个线程同时在两端进行入队和出队操作,但一端同时只能有一个线程进行操作,是通过两个锁进行区分的。

为了维护底部数据的统一,引入了AtomicInteger的一个count变量,表示队列中元素的个数。count只能在两个地方变化,一个是入队的方法(进行+1操作),另一个是出队的方法(进行-1操作),而AtomicInteger是原子安全的,所以也就确保了底层队列的数据同步。

二、ArrayBlockingQueue

// 构造方法
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

// 创建具有给定(固定)容量和指定访问策略的ArrayBlockingQueue 。
// 参数:capacity —— 这个队列的容量
        fair     —— 如果为true ,则在插入或删除时阻塞的线程的队列访问将按 FIFO 顺序处理;如果为false ,则未指定访问顺序。
public ArrayBlockingQueue(int capacity, boolean fair) {
    //  如果capacity < 1,抛出异常
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

1、底层数据结构

ArrayBlockingQueue内部是使用数组实现一个队列的,并且在构造方法中就需要指定容量,也就意味着底层数组一旦创建了,容量就不能改变了,因此ArrayBlockingQueue是一个容量限制的阻塞队列。因此在队列满的时候执行入队会阻塞,在队列为空时出队也会阻塞。

2、主要变量

// 元素数组,其长度在构造方法中指定
final Object[] items;

// 队列中实际的元素数量
int count;

// 保护所有通道的主锁
final ReentrantLock lock;

// 等待take条件的队列
private final Condition notEmpty;

// 等待put条件的队列
private final Condition notFull;

3、put(E e)

在此队列的尾部插入指定元素,如果队列已满,则等待空间可用

public void put(E e) throws InterruptedException {
    // 检查元素是否为null,如果是,抛出NullPointerException
    Objects.requireNonNull(e);
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lockInterruptibly();
    try {
        // 当队里中的元素数量等于数组长度,则队列已满,阻塞,等待队列成为不满状态
        while (count == items.length)
            notFull.await();
        // 将元素入队
        enqueue(e);
    } finally {
        // 释放锁
        lock.unlock();
    }
}

put方法总结:

1、ArrayBlockingQueue不允许添加null元素;

2、ArrayBlockingQueue在队列已满的时候,会调用notFull.await(),释放锁并处于阻塞状态;

3、一旦ArrayBlockingQueue在队列不满的时候,就立即入队。

4、E take()

取出此队列的头部元素,如果队列空,则阻塞,等待元素可取。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lockInterruptibly();
    try {
        // 当队列中元素数量为0时,则进入阻塞状态
        while (count == 0)
            notEmpty.await();
        // 队列不为空是,调用dequeue()出队
        return dequeue();
    } finally {
        // 释放锁
        lock.unlock();
    }
}

take方法总结:

1、取元素时,一旦获得锁,队列为空, 则会阻塞,直至不为空,调用dequeue()出队。

5、总结

ArrayBlockingQueue是一个底层结构是数组的阻塞队列,是通过 ReentrantLockCondition 来实现的。不可插入为null的元素,入队和出队使用的是同一个锁。意味着同一时刻只能有一个线程能进行入队或者出队操作。入队时,队列已满则会调用notFull.await(),进入阻塞状态。直到队列不满时,再进行入队操作。当出队时,队列为空,则调用notEmpty.await(),进入阻塞状态,直到队列不为空时,则出队。

三、LinkedBlockingQueue和ArrayBlockingQueue的区别

1、底层实现不同

LinkedBlockingQueue底层实现是链表,ArrayBlockingQueue底层实现是数组

2、队列容量

LinkedBlockingQueue默认的队列长度是Integer.Max,但是可以指定容量。在入队与出队都高并发的情况下,性能比ArrayBlockingQueue高很多;

ArrayBlockingQueue必须在构造方法中指定队列长度,不可变。在只有入队高并发或出队高并发的情况下,因为操作数组,且不需要扩容,性能很高。

3、锁的数量

LinkedBlockingQueue有两把锁,可以有两个线程同时进行入队和出队操作,但同时只能有一个线程进行入队或出队操作。

ArrayBlockingQueue只有一把锁,同时只能有一个线程进行入队和出队操作。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,463评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,868评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,213评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,666评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,759评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,725评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,716评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,484评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,928评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,233评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,393评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,073评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,718评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,308评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,538评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,338评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,260评论 2 352