并发编程之 LinkedBolckingQueue 源码剖析

前言

JDK 1.5 之后,Doug Lea 大神为我们写了很多的工具,整个 concurrent 包基本都是他写的。也为我们程序员写好了很多工具,包括我们之前说的线程池,重入锁,线程协作工具,ConcurrentHashMap 等等,今天我们要讲的是和 ConcurrentHashMap 类似的数据结构,LinkedBolckingQueue,阻塞队列。在生产者消费者模型中,该类可以帮助我们快速的实现业务功能。

  1. 如何使用?
  2. 源码分析

1. 如何使用?

我们在生产者消费者模型,生产者向一个数据共享通道存放数据,消费者从相同的数据共享通道获取数据,将生产和消费完全隔离,不仅是生产者消费者,现在流行的消息队列,比如各种MQ,kafka,和这个都差不多。废话不多说,直接来个demo ,看看怎么使用:

  public static void main(String[] args) {
    LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(1024);
    for (int i = 0; i < 5; i++) {
      final int num = i;
      new Thread(() -> {
        try {
          for (int j = 0; ; j++) {
            linkedBlockingQueue.put(num + "号线程的" + j + "号商品");
            Thread.sleep(5000);
          }
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }).start();
    }

    for (int i = 0; i < 5; i++) {
      new Thread(() -> {
        try {
          for (; ; ) {
            System.out.println("消费了" + linkedBlockingQueue.take());
            Thread.sleep(1000);
          }
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }).start();
    }
  }


运行结果:

消费了0号线程的0号商品
消费了3号线程的0号商品
消费了2号线程的0号商品
消费了1号线程的0号商品
消费了4号线程的0号商品
消费了2号线程的1号商品
消费了1号线程的1号商品
消费了0号线程的1号商品
消费了3号线程的1号商品
消费了4号线程的1号商品
消费了1号线程的2号商品
消费了0号线程的2号商品
消费了2号线程的2号商品
消费了3号线程的2号商品
消费了4号线程的2号商品
·········

从上面的代码中,我们使用了5条线程分别向队列中插入数据,也就是一个字符串,然后让5个线程从队列中取出数据并打印,可以看到,生产者插入的数据从消费者线程中被打印,没有漏掉一个。

注意,这里的 put 方法和 take 方法都是阻塞的,不然就不是阻塞队列了,什么意思呢?如果队列满了,put 方法就会等待,直到队列有空为止,因此该方法使用时需要注意,如果业务即时性很高,那么最好使用带有超时选项的 offer (V,long,TimeUnit),方法,同样, take 方法也是如此,当队列中没有的时候,就会阻塞,直到队列中有数据为止。同样可以使用 poll(long, TimeUnit)方法超时退出。

当然不止这几个方法,楼主将常用的方法总结一下:

插入方法:

 
    // 如果满了,立即返回false
    boolean b = linkedBlockingQueue.offer("");
    // 如果满了,则等待到给定的时间,如果还满,则返回false
    boolean b2 = linkedBlockingQueue.offer("", 1000, TimeUnit.MILLISECONDS);
    // 阻塞直到插入为止
    linkedBlockingQueue.put("");

取出方法:

    // 如果队列为空,直接返回null
    Object o3 = linkedBlockingQueue.poll();
    // 如果队列为空,一直阻塞到给定的时间
    Object o1 = linkedBlockingQueue.poll(1000, TimeUnit.MILLISECONDS);
    // 阻塞,直到取出数据
    Object o = linkedBlockingQueue.take();
    // 获取但不移除此队列的头;如果此队列为空,则返回 null。
    Object peek = linkedBlockingQueue.peek();

那么这些方法内部是如何实现的呢?

2. 源码分析

阻塞队列,重点看 put 阻塞方法和 take 阻塞方法。

put 方法:
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

该方法步骤如下:

  1. 根据给定的值创建一个 Node 对象,该对象有2个属性,一个是 item,一个是 Node 类型的 next,是链表结构的节点。
  2. 获取 put 的锁,注意,这里,put 锁和 take 锁是分开的。也就是说,当你插入的时候和取出的时候用的不是一把锁,可以高效并发,但是如果两个线程同时插入就会阻塞。
  3. 获取链表的长度。
  4. 使用中断锁,如果调用了线程的中断方法,那么,处于阻塞中的线程就会抛出异常。
  5. 判断如果当前链表长度达到了设置的长度,默认是 int 最大型,就调用 put 锁的伙伴 Condition 对象 notFull 让当前线程挂起等待。 直到 take 方法中会调用 notFull 对象的 signal 方法唤醒。
  6. 调用 enqueue 方法,将刚刚创建的 Node 节点连接到链表上。
  7. 将链表长度变量 count 加一。 判断如果加一后,链表长度还小于链表规定的容量,那么就唤醒其他等待在 notFull 对象上的线程,告诉他们可以取数据了。
  8. 放开锁,让其他线程争夺锁(非公平锁)。
  9. 如果c是0,表示队列已经有一个数据了,通知唤醒挂在 notEmpty 的线程,告诉他们可以取数据了。
take 方法如下:
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

步骤如下:

  1. 获取链长度,获取 take 锁。
  2. 调用可中断的 lock 方法。开始锁住。
  3. 如果队列是空,则挂起线程。开始等待。
  4. 如果不为空,则调用 dequeue 方法,拿到头节点的数据,并将头节点更新。
  5. 将队列长度减一。判断如果队列长度大于1,通知等待在 notEmpty 上的线程,可以拿数据了。
  6. 解锁。
  7. 如果变量 c 和 容量相同,而刚刚又消费了一个节点,说明队列不满了,则通知生产者可以添加数据了。
  8. 返回数据。
boolean offer(E e, long timeout, TimeUnit unit) 源码:
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }

该方法会阻塞给定的时间,如果时间到了,则返回false。
和 put 方法很相似,步骤如下:

  1. 将时间转成纳秒。
  2. 获取 put 锁。
  3. 调用可中断锁方法。
  4. 如果容量满了,并且设置的等待时间小于0,返回 false,表示插入失败,反之,调用 notFull 方法等待给定的时间,并返回一个负数,当第二次循环的时候,继续判断,如果还是满的并且小于0,返回false。
  5. 如果容量没有满,或者等待过程被唤醒,则调用 enqueue 插入数据。
  6. 获取当前链表长度。
  7. 判断链表长度+1是否小于设置的容量。如果小于,则链表没有满,通知生产者可以添加数据了。
  8. 释放锁。 如果 c 等于 0,表示之前没有数据,但是现在已经加入一个数据了,可以通知其他的消费者来消费了。
  9. 返回 true。
E poll(long timeout, TimeUnit unit) 源码分析
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

该方法会阻塞给定的时间,如果取不到数据,返回null。

步骤其实和上面的差不多,楼主偷个懒,就不解释了。

总结

从源码分析中,我们可以看到,整个阻塞队列就是由重入锁和Condition 组合实现的,和我们之前用 synchronized 加上 wait 和 notify 实现很相似,只是楼主的那个例子没有使用队列,因此无法将锁分开,也就是我们之前说的锁分离的技术。那么,整体的性能当然不能和 Doug Lea 大神的比了。

好了。今天的并发源码分析,就到这里。

good luck!!!!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,928评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,192评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,468评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,186评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,295评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,374评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,403评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,186评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,610评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,906评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,075评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,755评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,393评论 3 320
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,079评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,313评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,934评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,963评论 2 351

推荐阅读更多精彩内容

  • layout: posttitle: 《Java并发编程的艺术》笔记categories: Javaexcerpt...
    xiaogmail阅读 5,806评论 1 19
  • 一、并发 进程:每个进程都拥有自己的一套变量 线程:线程之间共享数据 1.线程 Java中为多线程任务提供了很多的...
    SeanMa阅读 2,430评论 0 11
  • 锁 锁是用来控制多个线程访问共享资源的方式,一般来说,一个锁能够防止多个线程同时访问共享资源(但是有些锁可以允许多...
    黄俊彬阅读 1,481评论 1 15
  • 第三章 Java内存模型 3.1 Java内存模型的基础 通信在共享内存的模型里,通过写-读内存中的公共状态进行隐...
    泽毛阅读 4,349评论 2 22
  • 上周过的不孬,晚饭都是在被请中度过的,周一佳佳请吃,周二郭总请吃,周三鹏哥请吃,周四小玉姐请吃,吼吼,周五,好像没...
    我的名字叫浩仔阅读 134评论 0 0