Java并发容器 -- LinkedBlockingQueue

LinkedBlockingQueue

数据结构

单向链表数据结构,代码如下:

static class Node<E> {
    // 节点数据
    E item;
    // 指向下一个节点的引用
    Node<E> next;
    // Constructor
    Node(E x) { item = x; }
}

LinkedBlockingQueue实现原理

是基于单向链表数据结构的BlockingQueue,为了保证线程安全,定义了两把锁(ReentrantLock):putLock(入队操作的锁)和takeLock(出队操作的锁)。还有两个条件变量(Condition):notEmptytakeLock锁所持有)和notFullputLock锁所持有)。还有一个count属性(AtomicInteger),表示队列中的元素数量。

元素入队(put)和出列('take')两个核心操作:

  • 当入队时(put方法),首先需要获取putLock锁,循环判断当前队列中的元素数量和capacity是否相等(表示队列已满),如果相等,则调用notFull.await方法,调用该方法的线程进入阻塞状态并释放putLock锁。如果队列未满,则创建Node将该NoDE指向队列尾部,并将count + 1。如果当前队列未满则调用notFull.signal方法,唤醒其它需要入队的线程。当count == 0时,调用notEmpty.signal方法,唤醒其它需要出队的线程。
  • 当出队时(take方法),首先需要获取takeLock锁,循环判断当前队列是否为空,如果为空,则调用notEmpty.await方法,调用该方法的线程进入阻塞状态并释放takeLock锁。如果队列还有元素,则出队,并将count - 1。如果当前队列仍有元素,则调用notEmpty.signal方法,唤醒其它需要出队的线程。当count == capacity时,调用notFull.signal方法,唤醒其它需要入队的线程。

LinkedBlockingQueue核心源码解读

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    
    static class Node<E> {
        E item;
        Node<E> next;
        Node(E x) { item = x; }
    }

    // 队列的容量
    private final int capacity;
    // 当前队列中元素的个数,线程安全的
    private final AtomicInteger count = new AtomicInteger();
    // 队列的头节点, head.item == null
    transient Node<E> head;
    // 队列的尾节点,last.next == null
    private transient Node<E> last;
    // 当调用take、pool等方法时,需要持有的锁(takeLock)
    private final ReentrantLock takeLock = new ReentrantLock();
    // 从队列删除元素时,所持有锁的条件,既队列非空情况下。
    private final Condition notEmpty = takeLock.newCondition();
    // 当调用put、offer等方法时,需要持有的锁(putLock)
    private final ReentrantLock putLock = new ReentrantLock();
    // 当从队列中插入一个元素时,所持有锁的条件,既队列非满情况下。
    private final Condition notFull = putLock.newCondition();
    // 当有线程调用put/offer方法时,此时队列为非空,需要调用该方法去通知(唤醒)调用了take/poll方法的线程。
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            // 通知调用了take或poll方法的线程await
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }
    // 当有线程调用take或poll方法时,此时队列为非满,需要调用该方法去通知(唤醒)调用了put或offer方法的线程。
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            // 通知调用了take或poll方法
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }
    // 入队,向队尾中插入元素
    private void enqueue(Node<E> node) {
        last = last.next = node;
    }

    // 出队,从队头中删除元素
    private E dequeue() {
        Node<E> h = head; // head
        Node<E> first = h.next; // first为新的head
        h.next = h; // help GC h.next 指向 h h节点变成孤儿节点
        head = first; // head 指向first
        E x = first.item; 
        first.item = null;
        return x;
    }
    // 获取两把锁
    void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }
    // 释放两把锁
    void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
    }
    // 默认构造器,创建Integer.MAX_VALUE容量的队列
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    // 指定容量大小的队列
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
    // 将指定的集合中的元素插入到队列中,capacity == Integer.MAX_VALUE
    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // 永远不会有锁竞争,但是需要保证可见性
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                // 入队
                enqueue(new Node<E>(e));
                ++n;
            }
            // 将count设置 c.size
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }
    // 返回限列中的元素数量
    public int size() {
        return count.get();
    }
    // 队列中剩余的容量大小
    public int remainingCapacity() {
        return capacity - count.get();
    }
    // 队列中插入元素,如果队列已满一直阻塞到队列有剩余空间。
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1; // 默认状态,表示put失败
        Node<E> node = new Node<E>(e); // 创建Node
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        // 支持中断的获取锁
        putLock.lockInterruptibly();
        try {
            // 注意这里一定用while,而不是if。当队列已满,则该方法进入阻塞状态
            while (count.get() == capacity) {
                notFull.await();
            }
            // 队列没满,元素入队
            enqueue(node);
            // 队列元素个数 + 1
            c = count.getAndIncrement();
            // 如果当前队列没满,通知其它调用put或offer线程可以插入元素了
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        // c == 0 代表队列中已经有一个元素,通知调用take或poll的线程可以拿元素
        // TODO c > 0 为什么不通知?
        if (c == 0)
            signalNotEmpty(); // notEmpty.signal();
    }
    // 指定的超时时间间隔内在队列尾部插一个元素,如果超时则返回false。阻塞
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // 超时时间的纳秒数
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            // 注意这里一定用while,而不是if。当队列已满,则该方法进入阻塞状态
            while (count.get() == capacity) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            // 队列没满,元素入队
            enqueue(new Node<E>(e));
            // 队列元素个数 + 1
            c = count.getAndIncrement();
            // 如果当前队列没满,通知其它调用put或offer线程可以插入元素了
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }
    
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        // 如果队列已满直接返回
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            // 只有队列未满时才可以将元素入队
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        // 获取takelock锁
        takeLock.lockInterruptibly();
        try {
            // 如果队列为空,则进入阻塞状态,并释放锁
            // 当有其它线程唤醒当前线程,注意还需要循环验证一下队列是否为空,因为可能有其它线程拿走元素使队列又为空
            while (count.get() == 0) { 
                notEmpty.await();
            }
            // 出队
            x = dequeue();
            // 计数减一
            c = count.getAndDecrement();
            // 如果队列个数大于,则唤醒其它调用take或poll操作的线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        // 如果队列中的元素个数等于capacity,则唤醒调用put或offer的线程。(因为该操作拿走一个元素,此时队列的个数是capacity - 1。
        if (c == capacity)
            signalNotFull();
        return x;
    }
    // 和take方法逻逻一样,只是调用带超时的await方法,当超过一段时间内没有被唤醒则放弃该操作
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
    // 该操作不会被阻塞(也就是叫不会调用await方法)。如果队列为空则返回null
    public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

    // 从队列的头部获取元素,注意它需要获取takeLock锁,如果不加锁,有可能读取null值。因为可能有可能其它线程调用了take方法导致head发生变化。
    public E peek() {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        // 获取takeLock,处于阻塞状态
        takeLock.lock();
        try {
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }

    void unlink(Node<E> p, Node<E> trail) {
        p.item = null;
        trail.next = p.next;
        if (last == p)
            last = trail;
        if (count.getAndDecrement() == capacity)
            notFull.signal();
    }

    public boolean remove(Object o) {
        if (o == null) return false;
        fullyLock();
        try {
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (o.equals(p.item)) {
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            fullyUnlock();
        }
    }

    public boolean contains(Object o) {
        if (o == null) return false;
        fullyLock();
        try {
            for (Node<E> p = head.next; p != null; p = p.next)
                if (o.equals(p.item))
                    return true;
            return false;
        } finally {
            fullyUnlock();
        }
    }

    public int drainTo(Collection<? super E> c) {
        return drainTo(c, Integer.MAX_VALUE);
    }

    public int drainTo(Collection<? super E> c, int maxElements) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        boolean signalNotFull = false;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            int n = Math.min(maxElements, count.get());
            // count.get provides visibility to first n Nodes
            Node<E> h = head;
            int i = 0;
            try {
                while (i < n) {
                    Node<E> p = h.next;
                    c.add(p.item);
                    p.item = null;
                    h.next = h;
                    h = p;
                    ++i;
                }
                return n;
            } finally {
                // Restore invariants even if c.add() threw
                if (i > 0) {
                    // assert h.item == null;
                    head = h;
                    signalNotFull = (count.getAndAdd(-i) == capacity);
                }
            }
        } finally {
            takeLock.unlock();
            if (signalNotFull)
                signalNotFull();
        }
    }
}

总结

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,794评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,050评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,587评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,861评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,901评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,898评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,832评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,617评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,077评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,349评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,483评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,199评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,824评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,442评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,632评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,474评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,393评论 2 352

推荐阅读更多精彩内容

  • 夜莺2517阅读 127,718评论 1 9
  • 版本:ios 1.2.1 亮点: 1.app角标可以实时更新天气温度或选择空气质量,建议处女座就不要选了,不然老想...
    我就是沉沉阅读 6,887评论 1 6
  • 我是黑夜里大雨纷飞的人啊 1 “又到一年六月,有人笑有人哭,有人欢乐有人忧愁,有人惊喜有人失落,有的觉得收获满满有...
    陌忘宇阅读 8,535评论 28 53
  • 兔子虽然是枚小硕 但学校的硕士四人寝不够 就被分到了博士楼里 两人一间 在学校的最西边 靠山 兔子的室友身体不好 ...
    待业的兔子阅读 2,597评论 2 9