LinkedBlockingQueue源码分析

LinkedBlockingQueue是用链表实现的FIFO队列,该队列大小默认是Integer.MAX_VALUE,可以认为是无界的,也可指定大小,使其有界。基于链表的队列通常比基于数组的队列有更高的吞吐量,但大多数并发场景下,实际的性能优劣不易测量。
LinkedBlockingQueue通过一个哑铃节点、takeLock和putLock、(AtomicInteger类型的)count,以及级联通知实现了双锁队列算法(的变体),提高了性能。
java.util包下的迭代器是fail-fast的,即当迭代器创建后,若外部通过非迭代器自身的方法修改集合的内容,会抛出ConcurrentModificationException异常;而java.util.concurrent包下的迭代器是弱一致性的,即当迭代器创建后,其他线程通过非迭代器自身的方法修改集合的内容并不会抛出异常。

1. LinkedBlockingQueue继承关系图

2. LinkedBlockingQueue源码分析

2.1 内部类
    // 底层单向链表的Node
    static class Node<E> {
        E item;
        // 为了LinkedBlockingQueue.Itr的弱一致性,可能出现next指向当前节点自身的情况
        Node<E> next;
        Node(E x) { item = x; }
    }
2.2 字段
    // 队列容量(未指定时默认是Integer.MAX_VALUE)
    private final int capacity;

    // 队列中元素的个数
    // 因为添加、获取(删除)操作使用各自的锁,且两种操作都会修改count
    // 的值,因此使用AtomicInteger类型的count来保证原子性(和可见性)
    private final AtomicInteger count = new AtomicInteger();
    
    // 底层链表的头节点
    transient Node<E> head;
    
    // 底层链表的尾节点
    private transient Node<E> last;
    
    // take、poll等方法使用的ReentrantLock和Condition
    private final ReentrantLock takeLock = new ReentrantLock();
    private final Condition notEmpty = takeLock.newCondition();
    
    // put、offer等方法使用的ReentrantLock和Condition
    private final ReentrantLock putLock = new ReentrantLock();
    private final Condition notFull = putLock.newCondition();
2.3 三个构造方法

(1)LinkedBlockingQueue(int)

    public LinkedBlockingQueue(int capacity) {
        // 检查capacity
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        // last和head初始时都指向该哑铃节点,正是由于该节点,实现了添加元素和获取(删除)元素
        // 两种操作的解耦,使得LinkedBlockingQueue这个基于链表的阻塞队列可以采用双锁队列算法
        last = head = new Node<E>(null);
    }

(2)LinkedBlockingQueue()

    public LinkedBlockingQueue() {
        // 调用LinkedBlockingQueue(int)
        this(Integer.MAX_VALUE);
    }

(3)LinkedBlockingQueue(Collection)

    public LinkedBlockingQueue(Collection<? extends E> c) {
        // 调用LinkedBlockingQueue(int)
        this(Integer.MAX_VALUE);
        // 要(在队尾)添加元素,所以使用putLock锁
        final ReentrantLock putLock = this.putLock;
        // 实例化时不存在多线程竞争,这里上锁是为了可见性
        putLock.lock();
        try {
            int n = 0;
            for (E e : c) {
                // e不能为null
                if (e == null)
                    throw new NullPointerException();
                // 注意n从0开始
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e)); // 入队
                ++n;
            }
            // 更新count.value的值
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }
2.4 辅助方法

(1)enqueue

    // 将node添加到队列末尾
    private void enqueue(Node<E> node) {
        // 该语句等价转化为:
        //       last.next = node;
        //       last = last.next;
        last = last.next = node;
    }

(2)dequeue

    // 从头部删除一个节点
    private E dequeue() {
        // 注意:head指向的是哑铃节点
        Node<E> h = head;
        Node<E> first = h.next;
        // 不直接将h.next置为null是为了LinkedBlockingQueue.Itr的弱一致性
        h.next = h; // help GC
        head = first;
        // 获取first.item作为返回值
        E x = first.item;
        // 将first.item置为null,作为新的哑铃节点
        first.item = null;
        return x;
    }

(3)signalNotEmpty

    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        // 先获取takeLock锁
        // notEmpty.signal中会判断当前线程是否持有锁,所以要调用
        // notEmpty的signal方法,必须持有takeLock锁,否则会抛出异常      
        takeLock.lock();
        try {
            // 调用notEmpty.signal后,会将条件队列中第一个调用了notEmpty.await或awaitNanos(nanos)
            // 的线程对应的Node转移到同步队列中,前驱节点调用unpark后才算真正唤醒该线程
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

(4)signalNotFull

    // 与signalNotEmpty类似
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        // 先获取putLock锁
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

(5)fullyLock

    // 上锁
    void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }   

(6)fullyUnlock

    // 解锁
    // 以一种顺序上锁,以相反的顺序释放锁,可避免死锁
    // (这里上锁顺序或解锁顺序颠倒也不会出现死锁)
    void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
    }
2.5 添加元素

(1)put-无限期阻塞版

    public void put(E e) throws InterruptedException {
        // e不能为null
        if (e == null) throw new NullPointerException();
        // 约定put、take等方法中将c初始化为-1,表示入队失败
        int c = -1;
        // 将e封装成Node
        Node<E> node = new Node<E>(e);
        // 要(在队尾)添加元素,所以使用putLock锁
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try { 
            // 获取count.value的值,若为capacity则阻塞当前线程
            // 可能出现当前线程被唤醒后队列大小仍是capacity的情况,当前线程会再次阻塞
            while (count.get() == capacity) {
                notFull.await();
            }
            // 入队
            enqueue(node);
            // 注意:getAndIncrement中会以CAS的方式将count.value加1并返回count.value原来的值
            c = count.getAndIncrement();
            // 注意c是本次入队前count.value的值
            // 这两行语句算是级联通知结构的一部分,若不采用级联通知,即将这两行语句替换为
            // signalNotEmpty();则每次入队都要获取takeLock,调用notEmpty.signal,极大降低了性能
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        // c为0表示队列中有一个元素
        if (c == 0)
            // 为了降低添加元素相关操作对takeLock的依赖,采用了级联通知
            // 级联通知:
            //     只要队列中有一个元素,就通过signalNotEmpty获取takeLock锁,并调用notEmpty.signal
            //     将一个消费者线程(若存在的话)加入同步队列(此时可能没有消费者线程,这种情况下,后
            //     面过来的消费者线程是不会阻塞的,因为队列中已经有元素了、也可能有多个阻塞着的消费
            //     者线程),后面过来的线程和同步队列中的线程竞争锁,竞争到锁的线程会执行出队操作,
            //     若队列不为空,该线程会调用notEmpty.signal,如此往复......若队列为空了,消费者线程
            //     会阻塞,又会通过这里的signalNotEmpty进行处理。
            signalNotEmpty();
    }   

(2)offer(E)-非阻塞版

    public boolean offer(E e) {
        // e不能为null(略)
        final AtomicInteger count = this.count;
        // 先获取count.value的值,尝试判断一下
        if (count.get() == capacity)
            return false;
        int c = -1;
        // 与put中相同的代码不再贴出
            // 当其他线程在上面if之后又添加元素时再次获取的
            // count.value就为capacity,条件为false,c为-1
            if (count.get() < capacity) {
                // 入队
                enqueue(node);
                // 再次强调:c是count.value入队前的值
                c = count.getAndIncrement();
                // 级联通知相关
                if (c + 1 < capacity)
                    notFull.signal();
            }
        // 若c仍为-1,表示入队失败,返回false,否则返回true
        return c >= 0;
    }   

(3)offer(E,long,TimeUnit)-限期阻塞版

    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        // 与put中相同的代码不再贴出
        // 转化为纳秒数
        long nanos = unit.toNanos(timeout);
        while (count.get() == capacity) {
                // 队列仍是满的,但阻塞时长已到,在这里跳出
                if (nanos <= 0)
                    return false;
                // 阻塞指定时长,也可能被提前唤醒
                nanos = notFull.awaitNanos(nanos);
            }
        return true;
    }
2.6 获取(删除)元素

(1)take-阻塞版

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        // 要(在队t头)删除元素,所以使用takeLock锁
        takeLock.lockInterruptibly();
        try {
            // 队列为空则阻塞
            while (count.get() == 0) {
                notEmpty.await();
            }
            // 出队
            x = dequeue();
            // 强调:c是count.value入队前的值
            c = count.getAndDecrement();
            // c为1说明经过上面的出队操作队列已空
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        // c为capacity说明当前队列中的元素个数是capacity-1
        if (c == capacity)
            // 为了降低添加元素相关操作对putLock的依赖,采用了级联通知
            // 与put的级联通知是相对应的,不再介绍
            signalNotFull();
        return x;
    }   

poll()-非阻塞版和poll(long,TimeUnit)-限期阻塞版与offer(E)-非阻塞版和offer(E,long,TimeUnit)-限期阻塞版是相对应的,不再介绍。
(2)remove-直接删除元素

    // 注意take、poll等方法是获取元素,remove是直接删除队列中第一个满足o.equals(p.item)的元素
    public boolean remove(Object o) {
        // 队列中的元素不允许为null,所以这里o也不能为null
        if (o == null) return false;
        // 因为要操作的是整个队列,所以使用双锁
        fullyLock();
        try {
            // 遍历链表
            // trail是p的前一个节点,trail初始为哑铃节点
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (o.equals(p.item)) {
                    // 找到满足o.equals(p.item)的节点,断开该节点
                    unlink(p, trail);
                    //  已移除一个,返回true
                    return true;
                }
            }
            // 未找到匹配的节点,返回false
            return false;
        } finally {
            // 释放双锁
            fullyUnlock();
        }
    }
    void unlink(Node<E> p, Node<E> trail) {
        // 为了LinkedBlockingQueue.Itr的弱一致性,不将p.next置为null
        p.item = null;
        // 进行连接
        trail.next = p.next;
        // p是否是最后一个节点
        if (last == p)
            last = trail;
        // 若队列中元素个数为capacity-1,则进行级联通知
        if (count.getAndDecrement() == capacity)
            notFull.signal();
    }
2.7 查看元素
    public E peek() {
        // 条件成立说明队列为空
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        // 要查看对头元素的值,因此使用takeLock
        takeLock.lock();
        try {
            // 获取头节点的下一个节点
            // 注意头节点是哑铃节点
            Node<E> first = head.next;
            // 当在上面if之后,其他线程将队列中的元素取完时,这里first就为null
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }   
2.8 弱一致性迭代器

下面通过两个例子对LinkedBlockingQueue.Itr.nextNode中的条件进行复现,留作参考。
(1)nextNode中"s == p"为true的例子

    public static void main(String[] args) throws InterruptedException {
        LinkedBlockingQueue<String> lbq = new LinkedBlockingQueue(3);
        lbq.put("a");
        lbq.put("b");
        lbq.put("c");
        Iterator<String> iterator = lbq.iterator();
        lbq.take();
        lbq.take();
        // 此处设断点,进入nextNode方法,会发现s == p成立
        iterator.next();
    }

(2)nextNode中"s != null && s.item == null"为true的例子

    public static void main(String[] args) throws InterruptedException {
        LinkedBlockingQueue<String> lbq = new LinkedBlockingQueue(3);
        lbq.put("a");
        lbq.put("b");
        lbq.put("c");
        Iterator<String> iterator = lbq.iterator();
        lbq.remove("a");
        lbq.take();
        // 此处设断点调试,进入nextNode方法,会
        // 发现s != null && s.item == null成立
        iterator.next();
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,922评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,591评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,546评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,467评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,553评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,580评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,588评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,334评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,780评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,092评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,270评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,925评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,573评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,194评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,437评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,154评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容