linkedBlockingQueue和arrayBlockingQueue

ArrayBlockingQueue和LinkedBlockingQueue的异同

1.队列大小有所不同,ArrayBlockingQueue是有界的初始化必须指定大小,而LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题。
2.数据存储容器不同,ArrayBlockingQueue采用的是数据作为数据存储容器,而LinkedBlockingQueue采用的则是以node节点为连接对象的链表。
3.由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据时,对于GC可能存在很大的影响。
4.两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加和移除操作采用的同一个ReentrantLock锁,而LinkedBlockingQueue使用的锁是分离的,添加采用的是putLock,移除采用的是takeLock,这样大大提高了队列的吞吐量,意味着高并发场景下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

1 LinkedBlockingQueue 参考

1.1linkedblockingqueue的基本概要

linkedblockingqueue是一个基于链表的阻塞队列,其内部维持一个基于链表的数据队列,实际上我们对LinkedBlockingQueue的API操作都是间接操作该内部数据队列。

1.2 LinkedBlockingQueue构造函数

队列默认大小是Integer.MAX_VALUE,所以在使用的时候建议手动传值,为其提供我们所需的大小,避免队列过大造成机器负载或内存饱满等情况。

三个构造函数:
第一种默认构造方法创建一个容量为 Integer.MAX_VALUE的 LinkedBlockingQueue实例。
第二种构造方法,指定了队列容量,首先判断指定容量是否大于零,否则抛出异常。然后为 capacity 赋值,最后创建空节点,并指向 head与 last,两者的 item与 next此时均为 null。
第三种,利用循环向队列中添加指定集合中的元素。

LinkedBlockingQueue是按照FIFO(先进先出)排列元素。
队列的头部元素是入队时间最长的元素,队列的尾部元素是在入队时间最短的元素。
队列执行获取操作会获得位于队列头部的元素,而新元素会被插入到队列的尾部。

1.3 内部成员

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
      implements BlockingQueue<E>, java.io.Serializable {

      static class Node<E> {
      E item;    
      Node<E> next;
      Node(E x) { item = x; }
  }

  /** 阻塞队列的大小,默认为Integer.MAX_VALUE */
  private final int capacity;

  /** 当前阻塞队列中的元素个数 */
  private final AtomicInteger count = new AtomicInteger();

  /**阻塞队列的头结点*/
  transient Node<E> head;

  /**阻塞队列的尾节点*/
  private transient Node<E> last;

  /** 获取并移除元素时使用的锁,如take, poll, etc */
  private final ReentrantLock takeLock = new ReentrantLock();

  /** notEmpty条件对象,当队列没有数据时用于挂起执行删除的线程 */
  private final Condition notEmpty = takeLock.newCondition();

  /** 添加元素时使用的锁如 put, offer, etc */
  private final ReentrantLock putLock = new ReentrantLock();

  /** notFull条件对象,当队列数据已满时用于挂起执行添加的线程 */
  private final Condition notFull = putLock.newCondition();
}

正常情况下,LinkedBlockingQueue的吞吐量要高于基于数组的队列ArrayBlockingQueue,因为前者的添加和删除操作使用的两个显示锁(ReentrantLock)来控制并发执行,而ArrayBlockingQueue只是使用一个ReentranLock控制并发。
两个显示锁(ReenTrantLock)一个控制头部,相当于控制新增元素;一个控制尾部,相当于控制删除元素;可以理解为读写分离。
每个添加到LinkedBlockingQueue队列中的数据都将被封装成Node节点,添加到链表队列中,其中head和last分别指向队列的头结点和尾结点。与ArrayBlockingQueue不同的是,LinkedBlockingQueue内部分别使用了takeLock和putLock对并发进行控制,也就是说,添加和删除操作并不是互斥操作,可以同时进行,这样也就大大提高吞吐量。
注意:如果没有给LinkedBlockingQueue指定容量大小,其默认值为Integer.MAX_VALUE,如果存在添加速度大于删除速度的时候,有可能会内存溢出,这点在使用前希望慎重考虑。
LinkedBlockingQueue的实现原理图和ArrayBlockingQueue是类似的,除了对添加和移除方法使用单独的锁控制外,两者都使用了不同的Condition条件对象作为等待队列,用于挂起take线程和put线程。

1.4 非阻塞式添加元素:add、offer方法原理

add方法
间接调用offer方法,如果成功返回true,如果失败抛出IllegalStateException队列已满的异常。

public boolean add(E e) {
   if (offer(e))
       return true;
   else
       throw new IllegalStateException("Queue full");
}

offer实现
offer方法做了两件事:
1.第一件事判断队列是否满,满了就直接释放锁,没满就将节点封装成Node入队,然后再次判断队列添加完成后是否已满,不满就继续唤醒等到在条件对象notFull上的添加线程。
2.第二件事判断是否需要唤醒等到在notEmpty条件对象上的消费线程。

public boolean offer(E e) {
    //添加元素为null直接抛出异常
    if (e == null) throw new NullPointerException();
    //获取队列的个数
    final AtomicInteger count = this.count;
    //判断队列是否已满
    if (count.get() == capacity)
        return false;
    int c = -1;
    //构建节点
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);//添加元素
            c = count.getAndIncrement();//拿到当前未添加新元素时的队列长度
            //如果容量还没满
            if (c + 1 < capacity)
                notFull.signal();//唤醒下一个添加线程,执行添加操作
        }
        } finally {
            putLock.unlock();
        }
    // 由于存在添加锁和消费锁,而消费锁和添加锁都会持续唤醒等到线程,因此count肯定会变化。
    //这里的if条件表示如果队列中还有1条数据
    if (c == 0) 
        signalNotEmpty();//如果还存在数据那么就唤醒消费锁
        return c >= 0; // 添加成功返回true,否则返回false
    }
}

enqueue入队操作

//入队操作
private void enqueue(Node node) {
    //队列尾节点指向新的node节点
    last = last.next = node;
}

signalNotEmpty唤醒 删除线程(如消费者线程)

//signalNotEmpty方法
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
        //唤醒获取并删除元素的线程
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

为什么添加完成后是继续唤醒在条件对象notFull上的添加线程,而不是像ArrayBlockingQueue那样直接唤醒notEmpty条件对象上的消费线程?
唤醒添加线程的原因,在添加新元素完成后,会判断队列是否已满,不满就继续唤醒在条件对象notFull上的添加线程,这点与前面分析ArrayBlockingQueue很不相同,在ArrayBlockingQueue内部完成添加操作后,会直接唤醒消费线程对元素进行获取,这是因为ArrayBlockingQueue只用了一个ReentrantLock同时对添加线程和消费线程进行控制,这样如果在添加完成后再次唤醒添加线程的话,消费线程可能永远无法执行,而对于LinkedBlockingQueue来说就不一样了,其内部对添加线程和消费线程分别使用了各自的ReentrantLock锁对并发进行控制,也就是说添加线程和消费线程是不会互斥的,所以添加锁只要管好自己的添加线程即可,添加线程自己直接唤醒自己的其他添加线程,如果没有等待的添加线程,直接结束了。如果有就直到队列元素已满才结束挂起,当然offer方法并不会挂起,而是直接结束,只有put方法才会当队列满时才执行挂起操作。注意消费线程的执行过程也是如此。
为什么判断if(c==0)时采取唤醒消费线程?

if (c == 0)  //c拿到当前未添加新元素时的队列长度
    signalNotEmpty();//如果还存在数据那么就唤醒消费锁

这是因为消费线程一旦被唤醒时一直在消费的(前提是有数据),所以C值一直在变化的,c值是添加完元素前队列的大小,此时c只可能是0或c>0,如果c=0,那么说明之前消费线程已停止,条件对象上可能存在等待的消费线程,添加完数据后应该是c+1,那么有数据就直接唤醒等待消费线程,如果没有就结束啦,等待下一次的消费操作。如果c>0那么消费线程就不会被唤醒,只能等待下一个消费操作(poll,take,remove)调用,那么为什么不是条件c>0采取唤醒呢?我们要明白的是消费线程一旦被唤醒会和添加线程一样,一直不断唤醒其他消费线程,如果添加前c>0,那么很可能上一次调用消费线程后,数据没有被消费完,条件队列上也就不存在等待的消费线程了,所以c>0唤醒消费线程的意义不是很大,当然如果添加线程一直添加元素,那么一直c>0,消费线程执行的换就要等待下一次调用消费操作了(poll,take,remove)。

1.5 阻塞式添加元素:put 方法原理

lockInterruptibly()方法能够中断等待获取锁的线程。当两个线程同时通过lock.lockInterruptibly()获取某个锁时,假若此时线程A获取到了锁,而线程B只有等待,那么对线程B调用threadB.interrupt()方法能够中断线程B的等待过程。参考

//将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用   
public void put(E e) throws InterruptedException {  
  //判断添加元素是否为null  
  if (e == null)  
      throw new NullPointerException();  
  int c = -1;  
  final ReentrantLock putLock = this.putLock;  
  final AtomicInteger count = this.count;  
  //获取插入的可中断锁  
  putLock.lockInterruptibly();  
  try {  
      try {  
          //判断队列是否已满  
          while (count.get() == capacity)  
              //如果已满则阻塞添加线程  
              notFull.await();  
      } catch (InterruptedException ie) {  
          //失败就唤醒添加线程  
          notFull.signal();   
          throw ie;  
      }  
      //添加元素  
      insert(e);  
      //修改c值  
      c = count.getAndIncrement();  
      //根据c值判断队列是否已满  
      if (c + 1 < capacity)  
          //未满则唤醒添加线程  
          notFull.signal();  
  } finally {  
      //释放锁  
      putLock.unlock();
  }  
  //c等于0代表添加成功  
  if (c == 0)  
      signalNotEmpty();  
}  

总结一下添加操作流程:
1.获取putLock锁
2.如果队列已满,阻塞添加线程(notFull.await())
3.元素入队
4.当前生产者添加元素之后如果队列还没有满,则通知其他生产者添加元素(notFull.signal())
5.释放putlock锁
6.如果队列中已经有元素,则通知消费者。

在添加元素时,如果队列已满,那么新到来的put线程将被添加到notFull条件等待队列中,具体如下图所示:


image.png

notFull 条件队列与putLock 显示锁关联,而不是与takeLock显示锁关联。putLock 显示锁负责对元素添加进行同步,具体的代码如下:

 /** putLock显示锁 */
private final ReentrantLock putLock = new ReentrantLock();
/**putLock 条件队列与putLock显示锁关联 */
private final Condition notFull = putLock.newCondition();

1.6非阻塞式移除:poll方法原型

poll操作流程:
1.如果队列没有数据,就返回null
2.加takeLock锁。
3.如果有数据就poll方法取出来,取到之后,如果队列还有数据,那么唤醒等待在条件对象notEmpty上的消费线程。
4.释放takeLock锁。
5.判断c==capacity,如果为true则唤醒添加线程。

public E poll() {
    //获取当前队列的大小
    final AtomicInteger count = this.count;
    if (count.get() == 0)//如果没有元素直接返回null
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        //判断队列是否有数据
        if (count.get() > 0) {
            //如果有,直接删除并获取该元素值
            x = dequeue();
            //当前队列大小减一
            c = count.getAndDecrement();
            //如果队列未空,继续唤醒等待在条件对象notEmpty上的消费线程
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
    takeLock.unlock();
    }
    //判断c是否等于capacity,这是因为如果满说明NotFull条件对象上
    //可能存在等待的添加线程
    if (c == capacity)
        signalNotFull();
    return x;
}

dequeue从头部删除元素

private E dequeue() {
    Node<E> h = head;//获取头结点
    Node<E> first = h.next; 获取头结的下一个节点(要删除的节点)
    h.next = h; // help GC//自己next指向自己,即被删除
    head = first;//更新头结点
    E x = first.item;//获取删除节点的值
    first.item = null;//清空数据,因为first变成头结点是不能带数据的,这样也就删除队列的带数据的第一个节点
    return x;
}

1.7阻塞移除元素:take方法原理

主要做了两件事:
1.如果队列中没有数据就挂起当前线程到notEmpty条件对象的等待队列中一直等待,如果有数据就删除节点并返回数据项,同时唤醒后续消费线程。
2.尝试唤醒条件对象notFull上等待队列中的添加线程。

public E take() throws InterruptedException {
    E x;
    int c = -1;
    //获取当前队列大小
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();//可中断
    try {
        //如果队列没有数据,挂机当前线程到条件对象的等待队列中
        while (count.get() == 0) {
            notEmpty.await();
        }
        //如果存在数据直接删除并返回该数据
        x = dequeue();
        c = count.getAndDecrement();//队列大小减1
        if (c > 1)
            notEmpty.signal();//还有数据就唤醒后续的消费线程
    } finally {
        takeLock.unlock();
    }
    //满足条件,唤醒条件对象上等待队列中的添加线程
    if (c == capacity)
        signalNotFull();
    return x;
}

1.8提取元素:peek和element

public E element() {
    E x = peek();//直接调用peek
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();//没数据抛异常
}

peek方法从头结点直接就可以获取到第一添加的元素,所以效率是比较高的,如果不存在则返回null。

//获取但不移除此队列的头;如果此队列为空,则返回 null 
public E peek() {  
    //判断元素数是否为0  
    if (count.get() == 0)  
        return null;  
    final ReentrantLock takeLock = this.takeLock;  
    //获取获取锁  
    takeLock.lock();  
    try {  
        //头节点的 next节点即为添加的第一个节点  
        Node<E> first = head.next;  
        //如果不为空则返回该节点  
        if (first == null)  
            return null;  
        else  
            return first.item;
    } finally {  
      //释放锁  
        takeLock.unlock();  
    }  
} 

从代码来看,head头结节点在初始化时是本身不带数据的,仅仅作为头部head方便我们执行链表的相关操作。peek返回直接获取头结点的下一个节点返回其值,如果没有值就返回null,有值就返回节点对应的值。

1.9 移除元素 remove的实现原理

public boolean remove(Object o) {
    if (o == null) return false;
    fullyLock();//同时对putLock和takeLock加锁
    try {
        //循环查找要删除的元素
        for (Node<E> trail = head, p = trail.next;p != null;trail = p, p = p.next) {
            if (o.equals(p.item)) {//找到要删除的节点
                unlink(p, trail);//直接删除
                return true;
            }
        }
        return false;
    } finally {
        fullyUnlock();//解锁
    }
}
//两个同时加锁
void fullyLock() {
    putLock.lock();
    takeLock.lock();
}
void fullyUnlock() {
    takeLock.unlock();
    putLock.unlock();
}

remove方法删除指定的对象,这里我们可能会诧异,为什么同时对putLock和takeLock加锁?这是因为remove方法删除的数据的位置不确定,为了避免造成并非安全问题,所以需要对2个锁同时加锁。

2 ArrayBlockingQueue

ArrayBlockingQueue采用的是循环数组的形式表达队列,所以在该类中不存在扩容的情况,对于数组的长度来说,根据初始化的参数为标准,类中没有默认的数组长度。

public class ArrayBlockingQueue<E> extends AbstractQueue<E>  implements BlockingQueue<E>, java.io.Serializable {
       final Object[] items;//存放元素的数组
       int takeIndex;//记录元素被取出元素的数组下标
       int putIndex;//记录元素被放入元素的数组下标
       int count;//记录元素的个数
       final ReentrantLock lock;//锁,同样也是保证多线程安全的一个重要因素
       private final Condition notEmpty;
       //notEmpty是当前lock的阻塞队列
       //作用就是采用内部的一个Condition队列来存储想通过put进行添加元素,但由于数组已满而被阻塞的线程。
       private final Condition notFull;
       //notFull是当前lock的另一个阻塞队列
       //作用就是采用内部的Condition队列来存储想通过take进行取出元素,但由于数组为空而被阻塞的线程。
       transient Itrs itrs = null;
}  

int count 这个成员是用来记录数组中当前拥有的元素的个数,那么通过添加和取出都要对这个变量进行操作,而这个成员并没有采用volitile进行修饰。但是该类采用了一个ReentrantLock lock,那我们就可以得出结论,这个类在所有的方法上都采用lock进行多线程安全的保证。所以对于count来说可以不添加任何有关多线程安全的修饰。

poll()

 public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();//这里的lock就是ArrayBlockingQueue类中唯一的锁lock。
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
 }

offer(E e)

public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
}

put(E e)

 public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

take()

 public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

既然说到了阻塞线程需要被唤醒,何时被唤醒?
对于put来说,表明数组中没有位置,所以无法put,只能阻塞,当一旦有poll操作的时候,poll方法会调用一个dequeue方法这个方法就是将数组中最老的元素进行取出,关键是在最后有一句 notFull.signal();
采用这句进行唤醒在notFull阻塞队列中的线程,被唤醒的线程加入到lock的等待队列中,等待机会执行。

 private E dequeue() {
        final Object[] items = this.items;
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

对于take来说,数组为空时,无法获取,进行阻塞,当一旦有offer操作的时候,offer操作会调用enqueue()方法,进行元素的添加,在最后进行notEmpty.signal();唤醒等待取操作的线程,被唤醒的线程加入到lock的等待队列中,等待机会执行。

 private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,245评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,749评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,960评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,575评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,668评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,670评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,664评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,422评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,864评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,178评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,340评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,015评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,646评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,265评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,494评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,261评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,206评论 2 352

推荐阅读更多精彩内容