Java阻塞队列之LinkedBlockingQueue学习与探索

本篇文章,我们学习研究一下Java中的阻塞队列中的LinkedBlockingQueue。一是为了了解阻塞队列的实现原理和常用方法。二是为了后续学习研究线程池打下基础。

JDK版本:1.8
阻塞队列都实现了BlockingQueue接口,BlockingQueue的继承结构如下所示。

BlockingQueue.png

BlockingQueue的方法有四类,它们的差异体现在处理某个不能立即满足,但是在将来某个时间点可能会满足的操作的时候。举个例子:

比如向队列中插入一个元素的时候。这个时候队列可能已经满了,或者当前线程无法获得锁对象,那么这个时候四类方法会有不同的处理方式。

  • 第一类抛出异常。
  • 第二类返回一个特殊的值(根据操作类型返回null或者false)。
  • 第三类无限阻塞当前线程直到操作成功。
  • 第四类只会阻塞当前线程特定一段时间,如果在指定时间内还没有操作成功则放弃操作。

如下表所示:

表头 Throws exception Special value Blocks Times out
插入 add add(e) offer(e) put(e) offer(e, time, unit)
删除 remove() poll() take() poll(time, unit)
检查 peek()

BlockingQueue的一些性质:

  • BlockingQueue不能插入null元素。
  • BlockingQueue可以有最大容量限制。
  • BlockingQueue的实现是线程安全的。

接下来我们就学习一下BlockingQueue的一个子类LinkedBlockingQueue

LinkedBlockingQueue.png

LinkedBlockingQueue是基于链表节点的阻塞队列。可以指定最大容量,默认情况下最大容量是Integer.MAX_VALUE。元素先进先出,在队列尾部插入元素,在队列头部获取元素。

链表节点类,是一个双向链表。

static final class Node<E> {
       
    E item;

    Node<E> prev;

    Node<E> next;

    Node(E x) {
        item = x;
    }
}

LinkedBlockingQueue的部分成员变量

//队列容量限制
private final int capacity;

//队列中的元素个数
private final AtomicInteger count = new AtomicInteger();

//队列头元素
transient Node<E> head;

//队列尾元素
private transient Node<E> last;

//获取元素的时候要持有的锁
private final ReentrantLock takeLock = new ReentrantLock();

//队列非空的条件
private final Condition notEmpty = takeLock.newCondition();

//插入元素要持有的锁
private final ReentrantLock putLock = new ReentrantLock();

//队列非满的条件,
private final Condition notFull = putLock.newCondition();

构造函数

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

//使用一个集合初始化LinkedBlockingQueue
public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // Never contended, but necessary for visibility
    try {
        int n = 0;
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            enqueue(new Node<E>(e));
            ++n;
        }
        count.set(n);
    } finally {
        putLock.unlock();
    }
}

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    //注释1处
    last = head = new Node<E>(null);
}

前两个构造函数都会调用第三个构造函数。注释1处,我们自己初始化了虚拟的头节点和尾节点。

接下来我们看下队列常用的方法

插入数据的方法

offer(E e)

/**
 * 将指定元素插入到队列尾部。如果队列已满则不插入,返回false。插入成功返回true。
 *
 * 当使用一个有容量限制的队列的时候,相对于add方法来说,该方法更可取。因为add方法
 * 在添加元素失败的时候仅仅抛出一个异常。
 *
 * @throws NullPointerException 指定元素为 null 抛异常
 */
public boolean offer(E e) { 
    //元素不能为null
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    //元素个数超过最呆容量,直接返回fasle
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    //注释1处,获取锁
    putLock.lock();
    try {
        //注释2处
        if (count.get() < capacity) {
            //注释3处
            enqueue(node);
            //注释4处
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                //通知生产者,队列不满,可以插入元素
                notFull.signal();
        }
    } finally {
        //注释5处
        putLock.unlock();
    }
    if (c == 0)
        //c == 0说明队列中至少有一个元素,通知消费者,队列不为空
        signalNotEmpty();
    //c>=0说明插入成功
    return c >= 0;
}

注释1处,获取锁,如果不能立即获取到锁,则阻塞当前线程。
注释2处,判断,只有当前队列中元素数量小于最大限制才执行插入操作。
注释3处,将元素插入到队列末尾。
注释4处,原子性的将count加1。

c = count.getAndIncrement();

一定要注意,返回值是加之前的值并不是加之后的值。

注释5处,释放锁。

到这里我们应该看出来了,阻塞队列是如何实现阻塞功能的呢?,两个字:加锁

enqueue(Node<E> node)方法

private void enqueue(Node<E> node) {
    //将last的next赋值为node,然后将last赋值为last.next
    last = last.next = node;
}

offer(E e, long timeout, TimeUnit unit),在指定时间内将元素插入到队列末尾。

/**
 * 将指定元素插入到队列末尾,如果队列已满的话,则等待,如果在指定时间队列还是满的,则不执行插入操作,返回false。
 *
 * @return {@code true} 插入成功返回ture,如果在指定时间队列还是满的,说明无法执行插入操作,返回false。
 * @throws InterruptedException {@inheritDoc}
 * @throws NullPointerException {@inheritDoc}
 */
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    //初始的等待时常
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    //注释1处
    putLock.lockInterruptibly();
    try {
        //如果队列中元素数量已经到达最大限制,while循环等待
        while (count.get() == capacity) {
            //注释2处
            if (nanos <= 0)
                return false;
            //注释3处
            nanos = notFull.awaitNanos(nanos);
        }
        //插入元素
        enqueue(new Node<E>(e));
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            //通知生产者
            notFull.signal();
    } finally {
        //释放锁
        putLock.unlock();
    }
    if (c == 0)
        //队列中有一个元素,通知消费者
        signalNotEmpty();
    return true;
}

注释1处,获取锁,该方法是可以被中断的。
注释2处,如果条件满足,说明在获取锁以后,没有在指定的时间内插入元素,则返回false。
注释3处,这里要解释一下。

Condition的long awaitNanos(long nanosTimeout) throws InterruptedException;

方法返回值是指定的参数nanosTimeout减去等待此方法返回的时间。如果返回值大于0,那么我们应该继续等待,如果返回值小于等于0,说明我们的等待时间已经耗尽。也就是注释2处,等待时间耗尽,直接返回false。

public void put(E e),将元素插入到队列末尾,如果队列已满的话,会等待直到空间可用,然后插入数据。可以被中断。

public void put(E e) throws InterruptedException {
     if (e == null) throw new NullPointerException();
     int c = -1;
     Node<E> node = new Node<E>(e);
     final ReentrantLock putLock = this.putLock;
     final AtomicInteger count = this.count;
     putLock.lockInterruptibly();
     try {
         //注释1处
         while (count.get() == capacity) {
             notFull.await();
         }
         enqueue(node);
         c = count.getAndIncrement();
         if (c + 1 < capacity)
             notFull.signal();
     } finally {
         putLock.unlock();
     }
     if (c == 0)
         signalNotEmpty();
}

注释1处,是该方法和offer(E e)方法的区别所在。如果队列已满,该方法会一直等待队列不满,然后执行插入操作,除非中途被中断。
offer(E e)方法是不会等待的,直接返回false。

获取数据的方法

E poll(),获取队列中的第一个元素,返回值可能为null。

public E poll() {
    final AtomicInteger count = this.count;
    //队列中没有元素
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    //获取锁
    takeLock.lock();
    try {
        if (count.get() > 0) {
            //注释1处
            x = dequeue();
            //注释2处
            c = count.getAndDecrement();
            //取出元素后,队列不为空,通知消费者
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    //注释3处
    if (c == capacity)
        signalNotFull();
    return x;
}

注释1处,移除队列头元素,然后将count减1。

注释2处,将元素数量减1,并返回减之前的值。

c = count.getAndDecrement();

注释3处,如果元素数量减1之前是capacity,这时候队列中元素的数量已经是capacity-1了,通知生产者可以插入数据。

//将元素从队列中移除并返回。
private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // 帮助GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

public E poll(long timeout, TimeUnit unit),在指定时间内获取获取队列中的第一个元素。

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    //获取锁,可中断
    takeLock.lockInterruptibly();
    try {
        //whle循环,队列为空,则等待
        while (count.get() == 0) {
            //等待超时,直接返回null
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        //移除队列头元素,然后将count减1
        x = dequeue();
        c = count.getAndDecrement();
        //队列不为空,通知消费者
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    //队列不满,通知生产者
    if (c == capacity)
        signalNotFull();
    return x;
}

E take(),获取队列中第一个元素,如果不存在元素则一直等待。可以被中断。

public E take() throws InterruptedException {
    E x;
    int c = -1;
      final AtomicInteger count = this.count;
      final ReentrantLock takeLock = this.takeLock;
      //获取锁,可被中断
      takeLock.lockInterruptibly();
      try {
          //注释1处
          while (count.get() == 0) {
              notEmpty.await();
          }
          //获取并移除元素
          x = dequeue();
          c = count.getAndDecrement();
          if (c > 1)
              notEmpty.signal();
      } finally {
          takeLock.unlock();
      }
      if (c == capacity)
          signalNotFull();
      return x;
}

注释1处,如果队列为空则等待。注意和E poll()方法的区别。如果队列为空E poll()方法不会等待。

public E peek() {
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        Node<E> first = head.next;
        if (first == null)
            return null;
        else
            return first.item;
    } finally {
        takeLock.unlock();
    }
}

该方法用来获取队列中的头元素,但是不会将元素从队列中移除。

移除数据的方法

public boolean remove(Object o) {
  if (o == null) return false;
     //注释1处
     fullyLock();
     try {
         for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) {
             if (o.equals(p.item)) {
                 //注释2处,移除元素,p就是要删除的元素,trail是要删除元素的前一个元素
                 unlink(p, trail);
                 return true;
             }
         }
         return false;
     } finally {
         fullyUnlock();
     }
}

注释1处,获取插入数据的锁和获取数据的锁。移除数据期间不允许插入和获取数据。

void fullyLock() {
    putLock.lock();
    takeLock.lock();
}

注释2处,移除元素,p就是要删除的元素,trail是要删除元素的前一个元素

void unlink(Node<E> p, Node<E> trail) {
    //将要删除元素的数据置为null
    p.item = null;
    //将要删除元素的前一个元素的next指向p的next
    trail.next = p.next;
    //如果删除的是最后一个元素,将last指向前一个元素
    if (last == p)
        last = trail;
    //通知消费者,队列不满
    if (count.getAndDecrement() == capacity)
        notFull.signal();
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,186评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,858评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,620评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,888评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,009评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,149评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,204评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,956评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,385评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,698评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,863评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,544评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,185评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,899评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,141评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,684评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,750评论 2 351