阻塞队列之LinkedBlockingQueue(LBQ)JDK1.8实现原理及源码详解

看这篇文章,如果想要理解其实现原理的话,可以先看我上一篇文章,ABQ的源码和实现原理的分析;
https://www.jianshu.com/p/4da0470f0b87

两个的实现原理是一样的,区别就是:

相同点:ArrayBlockingQueue和的LinkedBlockingQueue都是通过条件通知机制来实现可阻塞式插入和删除元素,并满足线程安全的特性;
不同点:1。ArrayBlockingQueue底层是采用的数组进行实现,而的LinkedBlockingQueue则是采用链表数据结构;

ArrayBlockingQueue插入和删除数据,只采用了一个锁,而的LinkedBlockingQueue的英文则在插入删除状语从句:采用分别了putLock状语从句:takeLock,这样可以降低线程由于线程无法获取到锁而进入WAITING状态的可能性,从而提高了线程并发执行的效率。

LinkedBlockingQueue是用链表实现的有界阻塞队列,当构造对象时为指定队列大小时,队列默认大小为Integer.MAX_VALUE。从它的构造方法可以看出

public LinkedBlockingQueue() { 
  this(Integer.MAX_VALUE)//默认构造容量为int型的最大值队列 
} 
public LinkedBlockingQueue(int capacity) { 
  if (capacity <= o) throw new IllegalArgumentException(); 
  this.capacity = capacity; 
  last = head = new Node<E>(null);//头指针和尾指针指向头节点(null) 
} 
public LinkedBlockingQueue(Collection<? extends E> c ) { 
  this(Integer.MAX_VALUE); 
  final ReentrantLock putLock = this.putLock; 
  putLock.lock();//这里和ArrayBlockingQueue也会获取锁,但它同样不是为了互斥操作
//,同样也是为了保证其可见性。 
  try { 
      int n = 0; 
      for (E e : c) { 
          if (e == null) 
              throw new NullPointerException(); 
          if (n == capacity) 
              throw new IllegalStateException("Queue full"); 
          enqueue(new Node<E>(e));//入队 
          ++n; 
      } 
      count.set(n); 
  } finally { 
      putLock.unlock(); 
  } 
}

在第12行中获取锁是为了保证可见性,原因我认为是线程T1是实例化LinkedBlockingQueue对象,T2是对实例化的LinkedBlockingQueue对象做入队操作(当然要保证T1和T2的执行顺序),如果不对它进行加锁操作(加锁会保证其可见性,也就是写回主存),T1的集合c有可能只存在T1线程维护的缓存中,并没有写回主存,T2中实例化的LinkedBlockingQueue维护的缓存以及主存中并没有集合c,此时就因为可见性造成数据不一致的情况,引发线程安全问题。

然后来看下几个重要的属性:

    //链表队列的最大长度
    private final int capacity;

   //链表队列的长度,数量
    private final AtomicInteger count = new AtomicInteger();

    //队列的头节点
    transient Node<E> head;

    //队列的尾节点
    private transient Node<E> last;

    //类似下面的生产线程
    private final ReentrantLock takeLock = new ReentrantLock();
    //类似消费者线程的作用。
    private final ReentrantLock putLock = new ReentrantLock();

//可见LinkedBlockingQueue中有两个锁,一个是为了锁住入队操作,一个是为了锁住出队操作。
//而在ArrayBlockingQueue中则只有一个锁,同时锁住队列的入队、出队操作。
    private final Condition notFull = putLock.newCondition();
   private final Condition notEmpty = takeLock.newCondition();

可以看出与ArrayBlockingQueue主要的区别是,的LinkedBlockingQueue在插入数据和删除数据时分别是由两个不同的锁(takeLock和putLock)来控制线程安全的,因此,也由这两个锁定生成了两个对应的条件(notEmpty和notFull)来实现可阻塞的插入和删除数据并且,采用了链表的数据结构来实现队列,节点结点的定义.

入队列第一种处理:
add(e)//队列未满时,返回true;队列满则抛出IllegalStateException(“Queue full”)异常——AbstractQueue

//该方法在AbstractQueue中,子类(LBQ)通过重写offer方法,调用父类的add方法。
public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

//LBQ重写的
public boolean offer(E e) {
      //如果队列为null,则抛异常
        if (e == null) throw new NullPointerException();
//为了保证可见性,count设置成局部变量AtomicInteger 原子类
        final AtomicInteger count = this.count;
        if (count.get() == capacity)   //当数据量等于队列容量时,无法入队,返回false 
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);  //封装成node
        final ReentrantLock putLock = this.putLock;  //插入锁 
        putLock.lock();   //获取锁,后面的获取失败会阻塞。
        try {
            if (count.get() < capacity) {
                enqueue(node);   //入队列
                c = count.getAndIncrement();    #标注1 //使用CAS操作将队列数量的count加一。
                                              //注意,这里返回的是自增前的值
                if (c + 1 < capacity)  //如果入队列后,队列当前数量仍然小于队列总容量,如果有入队列线程阻塞,则随机唤醒一个。
//执行入队列线程。
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)   //看上面#标注1,因为c等于自增前的操作,那么这里是第一次加入队列。
//这里可以这样理解,如果一个队列为null那么有出队列操作的话,会阻塞,只有当第一次开始
//入队列后,就唤醒该出队列阻塞线程。
            signalNotEmpty();
        return c >= 0;
    }

//入队列
private void enqueue(Node<E> node) {
        last = last.next = node;  
    }
//唤醒出队列阻塞线程。
private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

入队列第二种处理方式
offer(e)//队列未满时,返回true;队列满时返回false。非阻塞立即返回。 见上面代码。

第三种处理方式
offer(e, time, unit)//设定等待的时间,如果在指定时间内还不能往队列中插入数据则返回false,插入成功返回true。

public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();  //区别,可响应中断是否锁
        try {
            while (count.get() == capacity) {  //如果队列满,则自旋
                if (nanos <= 0)    //计算时间,超时返回false
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }

            enqueue(new Node<E>(e));  //如果队列没有满,可以执行入队列
            c = count.getAndIncrement();   //判断是否可以继续执行入队列线程的条件
            if (c + 1 < capacity)   #标注2
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)         //这里跟上面的处理意思一样。第一次入队列后,才可以唤醒出队列线程。
            signalNotEmpty();
        return true;
    }

重点来了: 看代码代码#标注2,很有意思,如果当前队列在加一个后,队列还没有满,那么可以继续随机唤醒一个阻塞的入队列线程。

第四种处理
put(e)//队列未满时,直接插入没有返回值;队列满时会阻塞等待,一直等到队列未满时再插入。

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;  
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;  
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
         
            while (count.get() == capacity) {   //区别:队列满,则阻塞
                notFull.await();
            }
            enqueue(node);   //队列不满了,则入队列
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

里面很多跟前面的处理思路是一样的。

关于出队列的四种阻塞处理


看第一种处理

//AbstractQueue#remove,这也是一个模板方法,定义删除队列元素的算法骨架,具体实现由子类来实现poll方法 
public E remove() {
        E x = poll();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    }

//LBQ中重写
public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;  //获取takeLock锁
        takeLock.lock();  
        try {
            if (count.get() > 0) {    //如果队列不为null,则出队列
                x = dequeue();  //出队列
                c = count.getAndDecrement();   //count自减,返回自减前的值
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)       //如果c(出队列前的值)等于当前队列的总容量,则表示队列已经不为满了。
//表示可以继续入队列,则唤醒入队列线程
            signalNotFull();
        return x;
    }

//出队列操作,返回删除的头队列的值
private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;   
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

//唤醒入队列阻塞线程
private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

第二种阻塞处理
poll()//队列不为空时返回队首值并移除;队列为空时返回null。非阻塞立即返回。  见上面。

第三种阻塞处理
poll(time, unit)//设定等待的时间,如果在指定时间内队列还未孔则返回null,不为空则返回队首值 。

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();   //响应中断
        try {
            while (count.get() == 0) {      //如果当前队列为null,,如果超时还未获取到值,则返回null
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();  //出队列
            c = count.getAndDecrement();  //这里返回CAS操作自减1,返回自减之前的值。

          //如果出队列之前为队列数量的值大于1,
          //则表示可以继续出队列,随机唤醒一个出队列线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)    //如果队列执行出队列之前为满,则唤醒入队列线程
            signalNotFull();
        return x;
    }

第四种处理方式
take(e)//队列不为空返回队首值并移除;当队列为空时会阻塞等待,一直等到队列不为空时再返回队首值。

public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;   //获取当前队列数量
        final ReentrantLock takeLock = this.takeLock;   //获取锁 
        takeLock.lockInterruptibly();      //响应中断
        try {
            while (count.get() == 0) {   //如果当前队列为null
                notEmpty.await();
            }
            x = dequeue();   //出队列
            c = count.getAndDecrement();   //获取队列自减之前的数量
            if (c > 1)       //参考上面的逻辑
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)      //参考上面的逻辑
            signalNotFull();
        return x;
    }

总结:

LBQ对比ABQ区别:

相同点:ArrayBlockingQueue和的LinkedBlockingQueue都是通过条件通知机制来实现可阻塞式插入和删除元素,并满足线程安全的特性;

不同点:1。ArrayBlockingQueue底层是采用的数组进行实现,而的LinkedBlockingQueue则是采用链表数据结构;

ArrayBlockingQueue插入和删除数据,只采用了一个锁,而的LinkedBlockingQueue分别针对入队列和出队列使用了两个锁。
可以降低线程由于线程无法获取到锁而进入WAITING状态的可能性,从而提高了线程并发执行的效率。

整理不易,喜欢点个赞

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,657评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,889评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,057评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,509评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,562评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,443评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,251评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,129评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,561评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,779评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,902评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,621评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,220评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,838评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,971评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,025评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,843评论 2 354

推荐阅读更多精彩内容