[Java源码][并发J.U.C]---阻塞队列LinkedBlockingQueue

前言

LinkedBlockingQueue是一个由链表结构组成的有界阻塞队列,按照先进先出的原则对元素进行排序.

本文源码: 源码地址

例子

本文先以一个小例子简单看看LinkedBlockingQueue的简单使用. 分别有两个类Producer负责产生新数据,Consumer负责消费数据. 例子中有两个消费者和三个生产者,每个生产者生成3条数据.

package com.linkedblockingqueue;

public class Test01 {

    static LinkedBlockingQueue lbq = new LinkedBlockingQueue(5);

    public static void main(String[] args) {
        Consumer consumer01 = new Consumer("consumer01");
        Consumer consumer02 = new Consumer("consumer02");
        Producer producer01 = new Producer("producer01");
        Producer producer02 = new Producer("producer02");
        Producer producer03 = new Producer("producer03");
        consumer01.start();
        consumer02.start();
        producer01.start();
        producer02.start();
        producer03.start();
    }

    static class Consumer extends Thread {
        Consumer(String name) {super(name);}
        public void run() {
            try {
                while (true) {
                    System.out.println(Thread.currentThread().getName() + " gets " + lbq.take());
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    static class Producer extends Thread {
        Producer(String name) {super(name);}
        public void run() {
            try {
                for (int i = 0; i <3; i++) {
                    lbq.put(Thread.currentThread().getName() + "-" + i);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

结果如下: 只要不重复消费数据即可.

consumer01 gets producer01-0
consumer02 gets producer01-1
consumer01 gets producer01-2
consumer02 gets producer02-0
consumer01 gets producer02-1
consumer02 gets producer02-2
consumer01 gets producer03-0
consumer02 gets producer03-1
consumer01 gets producer03-2

实现思路

首先想想如果不存在并发的问题,如何用链表实现一个队列呢,很简单,就是维护一个链表, 当要加入数据到队列中时,就生成一个节点往链表尾部插入并更新尾节点即可,当要取数据时从链表头部取数据并更新头节点.

当需要考虑并发的时候, 有几种方式,一种是利用CAS,volatile,比如ConcurrentLinkedQueue;另一种是加锁,本文中的LinkedBlockingQueue即采用加锁的方式进行操作的.

LinkedBlockingQueue采用了ReentrantLockCondition的方式来进行操作的, 如果不了解可以看一下我的另外两篇博客[Java源码][并发J.U.C]---解析Condition[Java源码][并发J.U.C]---用代码一步步实现ReentrantLock.

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

当需要往队列尾部插入数据时需要获得putLock锁, 当需要从队列头部获取数据时需要获得takeLock锁, 需要删除队列中某个元素时需要同时获得putLock锁和takeLock锁.
通俗得理解就是操作队列尾部需要putLock锁,操作队列头部需要takeLock锁,操作整个队列需要putLocktakeLock.

当队列中元素已满时,此时如果插入元素,会调用notFull.await()进行等待, 如果队列中元素没满时,需要调用notFull.signal()给那些之前因为队列满无法插入元素的休眠线程信号. 另外当队列元素由空到有的那个过程中需要调用一次notEmpty.signal()去给那些因为队列是空没有取得元素导致休眠(因为队列空此时会调用notEmpty.await()导致休眠)的线程信号.

同样的道理, 在获取元素时, 如果队列为空,会调用notEmpty.await()导致该线程休眠,如果获取元素后队列不为空,会调用notEmpty.signal()去给那些因为队列为空而休眠的线程信号. 另外当队列是从满到不满的过程中需要调用一次notFull.signal()去给那些因为队列满而无法put,poll操作的线程信号.

如果有点绕口的话我们就看看源代码会比较帮助理解.

源代码

属性

表示节点的类Node.
capacity: 队列的容量
count: 当前队列的容量
head: 链表头节点
last: 链表尾节点

/**
     * 链表的节点
     */
    static class Node<E> {
        E item;
        Node<E> next;
        Node(E x) { item = x; }
    }

    /** 容量 如果不指定则为Integer.MAX_VALUE */
    private final int capacity;

    /** 链表中的元素个数 */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * 链表的头部 head.item 为 null
     */
    transient Node<E> head;

    /**
     * 链表的尾部 last.next 为 null
     * Invariant: last.next == null
     */
    private transient Node<E> last;

初始化

初始化的时候链表头尾节点相同并且其值为null.

 /**
     * 无参构造函数 capacity默认为Integer.MAX_VALUE
     * {@link Integer#MAX_VALUE}.
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    /**
     * 有参构造函数 并且初始化头节点和尾节点
     * @param capacity the capacity of this queue
     * @throws IllegalArgumentException if {@code capacity} is not greater
     *         than zero
     */
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

辅助方法

包括进出队列,同时获得释放两个锁,和给等待的取操作或放操作信号等等.在下面要讨论的插入和获取元素会用到.

/**
     * Signals a waiting take. Called only from put/offer (which do not
     * otherwise ordinarily lock takeLock.)
     * signal一个正在等待的take操作
     */
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

    /**
     * Signals a waiting put. Called only from take/poll.
     * Signal一个等待的put操作
     */
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

    /**
     * 将node节点入队列
     *
     * @param node the node
     */
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

    /**
     * 从链表中返回一个节点 并更新头节点
     * @return the node
     */
    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

    /**
     * 同时获得putLock和takeLock
     */
    void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }

    /**
     * 同时释放putLock和takeLock
     */
    void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
    }

插入元素

put()方法: 将指定的元素插入此队列的尾部,如果空间已满则等待其空间变为可用.

/**
     * Inserts the specified element at the tail of this queue, waiting if
     * necessary for space to become available.
     *
     * 将指定的元素插入此队列的尾部,如果空间已满则等待其变为可用
     * 获得锁的过程中可以响应中断
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            // 如果当前队列容量已满 则该线程释放锁并休眠
            while (count.get() == capacity) {
                notFull.await();
            }
            // 往队列尾部插入一个节点
            enqueue(node);
            // 注意c是先get再increment
            c = count.getAndIncrement();
            // 增加这个元素后如果队列还没有满则给休眠的线程发信号
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        /**
         *  表示唤醒给取操作(take,poll)信号
         *  因为现在给空的队列中放入了一个元素
         */
        if (c == 0)
            signalNotEmpty();
    }

从代码中可以看到put方法是肯定会成功的,除非获得锁或者在休眠过程中被中断(此时会抛出中断异常).

put对应的还有两个插入元素的方法offer(E e, long timeout, TimeUnit unit) throws InterruptedExceptionoffer(E e).
offer(E e)方法是在如果队列不为空才插入返回true,如果为空则立马返回false,另外它不响应中断.
offer(E e, long timeout, TimeUnit unit) throws InterruptedException 如果队列为空时会等到timeout后才返回false,另外它与put一样在获得锁或者在休眠过程会响应中断.

获取元素

put对应的是take,与offer对应的是poll.

/**
     * 如果队列不为空 取队列中的第一个元素
     * 否则一直等待
     *
     * @return 队列中的第一个元素
     * @throws InterruptedException 获得锁的过程中线程被中断
     */

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            // 如果队列为空,则一直等待
            while (count.get() == 0) {
                notEmpty.await();
            }
            // 从队列中取进入时间最长的元素
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        /**
         *  表示唤醒给放操作(put,offer)信号
         *  因为现在给满的队列中消费了一个元素
         */
        if (c == capacity)
            signalNotFull();
        return x;
    }

另外注意dequeue中删除元素是将头节点的下一个节点的item先保存好然后将它的item属性设置为null,该节点会成为新的头节点,而原先的头节点的next会指向自己. 这个小细节在Itr中的nextNode方法会用到该特性.

对应的poll方法和peek方法就不多说了,

删除元素

/**
     * Unlinks interior Node p with predecessor trail.
     */
    void unlink(Node<E> p, Node<E> trail) {
        // assert isFullyLocked();
        // p.next is not changed, to allow iterators that are
        // traversing p to maintain their weak-consistency guarantee.
        p.item = null;
        trail.next = p.next;
        if (last == p)
            last = trail;
        if (count.getAndDecrement() == capacity)
            notFull.signal();
    }

    /**
     * 删除元素o
     *
     * @param o element to be removed from this queue, if present
     * @return {@code true} if this queue changed as a result of the call
     */
    public boolean remove(Object o) {
        if (o == null) return false;
        fullyLock();
        try {
            // p 是要被删除的节点, trail是p节点的前驱节点
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (o.equals(p.item)) {
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            fullyUnlock();
        }
    }

需要注意两点
1.remove方法中是需要获得两个锁takeLockputLock,因为这是在操作整个链表.
2. 由于是删除元素,可以间接性的认为是在消费队列,所以如果队列是从满到不满的一个过程则需要调用notFull.signal()方法.(在unlink方法中)

与此类似的还有contains(Object o)clear() 方法就不多说了.

drainTo

该方法的意思是从队列中消费指定的个数(默认是Integer.MAX_VALUE)并加入到指定的容器中.

/**
     * @throws UnsupportedOperationException {@inheritDoc}
     * @throws ClassCastException            {@inheritDoc}
     * @throws NullPointerException          {@inheritDoc}
     * @throws IllegalArgumentException      {@inheritDoc}
     */
    public int drainTo(Collection<? super E> c) {
        return drainTo(c, Integer.MAX_VALUE);
    }

    /**
     *
     * 将元素都取到c中 n = Math.min(maxElements, 当前队列中的元素个数);
     *
     * @throws UnsupportedOperationException {@inheritDoc}
     * @throws ClassCastException            {@inheritDoc}
     * @throws NullPointerException          {@inheritDoc}
     * @throws IllegalArgumentException      {@inheritDoc}
     */
    public int drainTo(Collection<? super E> c, int maxElements) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        boolean signalNotFull = false;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            int n = Math.min(maxElements, count.get());
            // count.get provides visibility to first n Nodes
            Node<E> h = head;
            int i = 0;
            try {
                // 消费n个元素并且加入到容器c中
                while (i < n) {
                    Node<E> p = h.next;
                    c.add(p.item);
                    p.item = null;
                    h.next = h;
                    h = p;
                    ++i;
                }
                return n;
            } finally {
                // Restore invariants even if c.add() threw
                if (i > 0) {
                    // assert h.item == null;
                    // 判断是否是一个满到不满的过程
                    head = h;
                    signalNotFull = (count.getAndAdd(-i) == capacity);
                }
            }
        } finally {
            takeLock.unlock();
            if (signalNotFull)
                signalNotFull();
        }
    }

遍历元素

Iterator很常规的方法,由于是操作整个链表,因此需要同时获取的是两个锁.

参考

1. Java1.8 源码.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,451评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,172评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,782评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,709评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,733评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,578评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,320评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,241评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,686评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,878评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,992评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,715评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,336评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,912评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,040评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,173评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,947评论 2 355