LinkedBlockingQueue源码分析

LinkedBlockingQueue是用链表实现的FIFO队列,该队列大小默认是Integer.MAX_VALUE,可以认为是无界的,也可指定大小,使其有界。基于链表的队列通常比基于数组的队列有更高的吞吐量,但大多数并发场景下,实际的性能优劣不易测量。
LinkedBlockingQueue通过一个哑铃节点、takeLock和putLock、(AtomicInteger类型的)count,以及级联通知实现了双锁队列算法(的变体),提高了性能。
java.util包下的迭代器是fail-fast的,即当迭代器创建后,若外部通过非迭代器自身的方法修改集合的内容,会抛出ConcurrentModificationException异常;而java.util.concurrent包下的迭代器是弱一致性的,即当迭代器创建后,其他线程通过非迭代器自身的方法修改集合的内容并不会抛出异常。

1. LinkedBlockingQueue继承关系图

2. LinkedBlockingQueue源码分析

2.1 内部类
    // 底层单向链表的Node
    static class Node<E> {
        E item;
        // 为了LinkedBlockingQueue.Itr的弱一致性,可能出现next指向当前节点自身的情况
        Node<E> next;
        Node(E x) { item = x; }
    }
2.2 字段
    // 队列容量(未指定时默认是Integer.MAX_VALUE)
    private final int capacity;

    // 队列中元素的个数
    // 因为添加、获取(删除)操作使用各自的锁,且两种操作都会修改count
    // 的值,因此使用AtomicInteger类型的count来保证原子性(和可见性)
    private final AtomicInteger count = new AtomicInteger();
    
    // 底层链表的头节点
    transient Node<E> head;
    
    // 底层链表的尾节点
    private transient Node<E> last;
    
    // take、poll等方法使用的ReentrantLock和Condition
    private final ReentrantLock takeLock = new ReentrantLock();
    private final Condition notEmpty = takeLock.newCondition();
    
    // put、offer等方法使用的ReentrantLock和Condition
    private final ReentrantLock putLock = new ReentrantLock();
    private final Condition notFull = putLock.newCondition();
2.3 三个构造方法

(1)LinkedBlockingQueue(int)

    public LinkedBlockingQueue(int capacity) {
        // 检查capacity
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        // last和head初始时都指向该哑铃节点,正是由于该节点,实现了添加元素和获取(删除)元素
        // 两种操作的解耦,使得LinkedBlockingQueue这个基于链表的阻塞队列可以采用双锁队列算法
        last = head = new Node<E>(null);
    }

(2)LinkedBlockingQueue()

    public LinkedBlockingQueue() {
        // 调用LinkedBlockingQueue(int)
        this(Integer.MAX_VALUE);
    }

(3)LinkedBlockingQueue(Collection)

    public LinkedBlockingQueue(Collection<? extends E> c) {
        // 调用LinkedBlockingQueue(int)
        this(Integer.MAX_VALUE);
        // 要(在队尾)添加元素,所以使用putLock锁
        final ReentrantLock putLock = this.putLock;
        // 实例化时不存在多线程竞争,这里上锁是为了可见性
        putLock.lock();
        try {
            int n = 0;
            for (E e : c) {
                // e不能为null
                if (e == null)
                    throw new NullPointerException();
                // 注意n从0开始
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e)); // 入队
                ++n;
            }
            // 更新count.value的值
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }
2.4 辅助方法

(1)enqueue

    // 将node添加到队列末尾
    private void enqueue(Node<E> node) {
        // 该语句等价转化为:
        //       last.next = node;
        //       last = last.next;
        last = last.next = node;
    }

(2)dequeue

    // 从头部删除一个节点
    private E dequeue() {
        // 注意:head指向的是哑铃节点
        Node<E> h = head;
        Node<E> first = h.next;
        // 不直接将h.next置为null是为了LinkedBlockingQueue.Itr的弱一致性
        h.next = h; // help GC
        head = first;
        // 获取first.item作为返回值
        E x = first.item;
        // 将first.item置为null,作为新的哑铃节点
        first.item = null;
        return x;
    }

(3)signalNotEmpty

    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        // 先获取takeLock锁
        // notEmpty.signal中会判断当前线程是否持有锁,所以要调用
        // notEmpty的signal方法,必须持有takeLock锁,否则会抛出异常      
        takeLock.lock();
        try {
            // 调用notEmpty.signal后,会将条件队列中第一个调用了notEmpty.await或awaitNanos(nanos)
            // 的线程对应的Node转移到同步队列中,前驱节点调用unpark后才算真正唤醒该线程
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

(4)signalNotFull

    // 与signalNotEmpty类似
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        // 先获取putLock锁
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

(5)fullyLock

    // 上锁
    void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }   

(6)fullyUnlock

    // 解锁
    // 以一种顺序上锁,以相反的顺序释放锁,可避免死锁
    // (这里上锁顺序或解锁顺序颠倒也不会出现死锁)
    void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
    }
2.5 添加元素

(1)put-无限期阻塞版

    public void put(E e) throws InterruptedException {
        // e不能为null
        if (e == null) throw new NullPointerException();
        // 约定put、take等方法中将c初始化为-1,表示入队失败
        int c = -1;
        // 将e封装成Node
        Node<E> node = new Node<E>(e);
        // 要(在队尾)添加元素,所以使用putLock锁
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try { 
            // 获取count.value的值,若为capacity则阻塞当前线程
            // 可能出现当前线程被唤醒后队列大小仍是capacity的情况,当前线程会再次阻塞
            while (count.get() == capacity) {
                notFull.await();
            }
            // 入队
            enqueue(node);
            // 注意:getAndIncrement中会以CAS的方式将count.value加1并返回count.value原来的值
            c = count.getAndIncrement();
            // 注意c是本次入队前count.value的值
            // 这两行语句算是级联通知结构的一部分,若不采用级联通知,即将这两行语句替换为
            // signalNotEmpty();则每次入队都要获取takeLock,调用notEmpty.signal,极大降低了性能
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        // c为0表示队列中有一个元素
        if (c == 0)
            // 为了降低添加元素相关操作对takeLock的依赖,采用了级联通知
            // 级联通知:
            //     只要队列中有一个元素,就通过signalNotEmpty获取takeLock锁,并调用notEmpty.signal
            //     将一个消费者线程(若存在的话)加入同步队列(此时可能没有消费者线程,这种情况下,后
            //     面过来的消费者线程是不会阻塞的,因为队列中已经有元素了、也可能有多个阻塞着的消费
            //     者线程),后面过来的线程和同步队列中的线程竞争锁,竞争到锁的线程会执行出队操作,
            //     若队列不为空,该线程会调用notEmpty.signal,如此往复......若队列为空了,消费者线程
            //     会阻塞,又会通过这里的signalNotEmpty进行处理。
            signalNotEmpty();
    }   

(2)offer(E)-非阻塞版

    public boolean offer(E e) {
        // e不能为null(略)
        final AtomicInteger count = this.count;
        // 先获取count.value的值,尝试判断一下
        if (count.get() == capacity)
            return false;
        int c = -1;
        // 与put中相同的代码不再贴出
            // 当其他线程在上面if之后又添加元素时再次获取的
            // count.value就为capacity,条件为false,c为-1
            if (count.get() < capacity) {
                // 入队
                enqueue(node);
                // 再次强调:c是count.value入队前的值
                c = count.getAndIncrement();
                // 级联通知相关
                if (c + 1 < capacity)
                    notFull.signal();
            }
        // 若c仍为-1,表示入队失败,返回false,否则返回true
        return c >= 0;
    }   

(3)offer(E,long,TimeUnit)-限期阻塞版

    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        // 与put中相同的代码不再贴出
        // 转化为纳秒数
        long nanos = unit.toNanos(timeout);
        while (count.get() == capacity) {
                // 队列仍是满的,但阻塞时长已到,在这里跳出
                if (nanos <= 0)
                    return false;
                // 阻塞指定时长,也可能被提前唤醒
                nanos = notFull.awaitNanos(nanos);
            }
        return true;
    }
2.6 获取(删除)元素

(1)take-阻塞版

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        // 要(在队t头)删除元素,所以使用takeLock锁
        takeLock.lockInterruptibly();
        try {
            // 队列为空则阻塞
            while (count.get() == 0) {
                notEmpty.await();
            }
            // 出队
            x = dequeue();
            // 强调:c是count.value入队前的值
            c = count.getAndDecrement();
            // c为1说明经过上面的出队操作队列已空
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        // c为capacity说明当前队列中的元素个数是capacity-1
        if (c == capacity)
            // 为了降低添加元素相关操作对putLock的依赖,采用了级联通知
            // 与put的级联通知是相对应的,不再介绍
            signalNotFull();
        return x;
    }   

poll()-非阻塞版和poll(long,TimeUnit)-限期阻塞版与offer(E)-非阻塞版和offer(E,long,TimeUnit)-限期阻塞版是相对应的,不再介绍。
(2)remove-直接删除元素

    // 注意take、poll等方法是获取元素,remove是直接删除队列中第一个满足o.equals(p.item)的元素
    public boolean remove(Object o) {
        // 队列中的元素不允许为null,所以这里o也不能为null
        if (o == null) return false;
        // 因为要操作的是整个队列,所以使用双锁
        fullyLock();
        try {
            // 遍历链表
            // trail是p的前一个节点,trail初始为哑铃节点
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (o.equals(p.item)) {
                    // 找到满足o.equals(p.item)的节点,断开该节点
                    unlink(p, trail);
                    //  已移除一个,返回true
                    return true;
                }
            }
            // 未找到匹配的节点,返回false
            return false;
        } finally {
            // 释放双锁
            fullyUnlock();
        }
    }
    void unlink(Node<E> p, Node<E> trail) {
        // 为了LinkedBlockingQueue.Itr的弱一致性,不将p.next置为null
        p.item = null;
        // 进行连接
        trail.next = p.next;
        // p是否是最后一个节点
        if (last == p)
            last = trail;
        // 若队列中元素个数为capacity-1,则进行级联通知
        if (count.getAndDecrement() == capacity)
            notFull.signal();
    }
2.7 查看元素
    public E peek() {
        // 条件成立说明队列为空
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        // 要查看对头元素的值,因此使用takeLock
        takeLock.lock();
        try {
            // 获取头节点的下一个节点
            // 注意头节点是哑铃节点
            Node<E> first = head.next;
            // 当在上面if之后,其他线程将队列中的元素取完时,这里first就为null
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }   
2.8 弱一致性迭代器

下面通过两个例子对LinkedBlockingQueue.Itr.nextNode中的条件进行复现,留作参考。
(1)nextNode中"s == p"为true的例子

    public static void main(String[] args) throws InterruptedException {
        LinkedBlockingQueue<String> lbq = new LinkedBlockingQueue(3);
        lbq.put("a");
        lbq.put("b");
        lbq.put("c");
        Iterator<String> iterator = lbq.iterator();
        lbq.take();
        lbq.take();
        // 此处设断点,进入nextNode方法,会发现s == p成立
        iterator.next();
    }

(2)nextNode中"s != null && s.item == null"为true的例子

    public static void main(String[] args) throws InterruptedException {
        LinkedBlockingQueue<String> lbq = new LinkedBlockingQueue(3);
        lbq.put("a");
        lbq.put("b");
        lbq.put("c");
        Iterator<String> iterator = lbq.iterator();
        lbq.remove("a");
        lbq.take();
        // 此处设断点调试,进入nextNode方法,会
        // 发现s != null && s.item == null成立
        iterator.next();
    }
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容