ConcurrentLinkedQueue原理解析

JDK中,提供了一个ConcurrentLinkedQueue类来实现高并发的队列。这个队列使用链表作为其数据结构。这个类算是高并发环境中性能最好的队列。高性能是因为其内部复杂的实现。

CAS

CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。

ConcurrentLinkedQueue是一种线程安全的队列。他是使用非阻塞算法(CAS)来实现线程安全的。ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部;当我们获取一个元素时,它会返回队列头部的元素。

ConcurrentLinkedQueue结构

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable

ConcurrentLinkedQueue由head节点和tail节点组成,每个节点(Node)由节点元素(item)和指向下一个节点(next)的引用组成,节点与节点之间就是通过这个next关联起来,从而组成一张链表结构的队列。默认情况下head节点存储的元素为空,tail节点等于head节点

private static class Node<E> { 
       private volatile  E item;           // 声明为 volatile 型
       private volatile  Node<E> next;    // 声明为 volatile 型

       Node(E item) {                       // 创建新节点
           lazySetItem(item);              // 惰性设置 item 域的值
        } 

       E getItem() { 
           return item; 
       } 

       boolean casItem(E cmp, E val) {   // 使用 CAS 指令设置 item 域的值
           return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); 
       } 

       void setItem(E val) {  // 使用“volatile 写”的方式,设置 item 域的值
            item = val; 
       } 
       void lazySetItem(E val) { //惰性设置 item 域的值
                UNSAFE.putOrderedObject(this, itemOffset, val); 
       } 

       void lazySetNext(Node<E> val) {    // 惰性设置 next 域的值 
           UNSAFE.putOrderedObject(this, nextOffset, val); 
       } 

       Node<E> getNext() { 
           return next; 
       } 

                                                     //CAS 设置 next 域的值
       boolean casNext(Node<E> cmp, Node<E> val) { 
           return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); 
       } 

       private static final sun.misc.Unsafe UNSAFE=     // 域更新器
       sun.misc.Unsafe.getUnsafe(); 
       private static final long nextOffset=              //next 域的偏移量
       objectFieldOffset(UNSAFE, "next", Node.class); 
       private static final long itemOffset=              //item 域的偏移量
       objectFieldOffset(UNSAFE, "item", Node.class); 
 }

其实这是一个静态内部类,item是来表示元素的。next表示当前Node的下一个元素。
对Node进行操作时,使用了CAS操作:

//表示设置当前Node的item值。第一个参数为期望值,第二个参数为设置目标值。当当前值等于期望值时(就是没有被其他人改过),就会将目标设置为val。
boolean casItem(E cmp, E val) {
    return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}

//和上个方法类似,是用来设置next字段。
boolean casNext(Node<E> cmp, Node<E> val) {
    return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}

ConcurrentLinkedQueue内部有两个重要的字段:head和tail,分别表示链表的头部和尾部,当然都得是Node类型。对于head来说,它永远不会为null,并且通过head和succ()后继方法一定能完整地遍历整个链表。对于tail来说,它自然表示队列的末尾。
以下是构造函数:

    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }

但是ConcurrentLinkedQueue的内部实现非常复杂,它允许在运行时链表处于多个不同的状态。以tail为例,一般来说我们期望tail总是作为链表的末尾,但实际上tail的更新并不是及时的,可能会产生拖延现象。就是说,有可能指向的并不是真正的尾巴,真正的可能在后面一个或者多个。

再来看看Node类的结构:

private static class Node<E> {
        volatile E item;
        volatile Node<E> next;
        .......
}

Node节点主要包含了两个域:一个是数据域item,另一个是next指针,用于指向下一个节点从而构成链式队列。并且都是用volatile进行修饰的,以保证内存可见性
另外ConcurrentLinkedQueue含有这样两个成员变量:

private transient volatile Node<E> head;
private transient volatile Node<E> tail;

说明ConcurrentLinkedQueue通过持有头尾指针进行管理队列。当我们调用无参构造器时,其源码为:

public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}

head和tail指针会指向一个item域为null的节点,此时ConcurrentLinkedQueue状态如下图所示:
如图,head和tail指向同一个节点Node0,该节点item域为null,next域为null。


image.png

操作Node的几个CAS操作

在队列进行出队入队的时候免不了对节点需要进行操作,在多线程就很容易出现线程安全的问题。
可以看出在处理器指令集能够支持CMPXCHG指令后,在java源码中涉及到并发处理都会使用CAS操作,那么在ConcurrentLinkedQueue对Node的CAS操作有上面提到的这几个:

//更改Node中的数据域item   
boolean casItem(E cmp, E val) {
    return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
//更改Node中的指针域next
void lazySetNext(Node<E> val) {
    UNSAFE.putOrderedObject(this, nextOffset, val);
}
//更改Node中的指针域next
boolean casNext(Node<E> cmp, Node<E> val) {
    return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}

可以看出这些方法实际上是通过调用UNSAFE实例的方法,UNSAFE为sun.misc.Unsafe类,该类是hotspot底层方法,目前为止了解即可,知道CAS的操作归根结底是由该类提供就好。

节点类型说明

有效节点:从 head 向后遍历可达的节点当中,item 域不为 null 的节点。
无效节点:从 head 向后遍历可达的节点当中,item 域为 null 的节点。
已删除节点:从 head 向后遍历不可达的节点。
哨兵节点:链接到自身的节点(哨兵节点同时也是以删除节点)。
头节点:队列中的第一个有效节点(如果有的话)。
尾节点:队列中 next 域为 null 的节点(可以是无效节点)。


image.png

在后面的源代码分析中,我们将会看到队列有时会处于不一致状态。为此,ConcurrentLinkedQueue 使用三个不变式 ( 基本不变式,head 的不变式和 tail 的不变式 ),来约束队列中方法的执行。通过这三个不变式来维护非阻塞算法的正确性
基本不变式
在执行方法之前和之后,队列必须要保持的不变式:

  • 当入队插入新节点之后,队列中有一个 next 域为 null 的(最后)节点。
  • 从 head 开始遍历队列,可以访问所有 item 域不为 null 的节点。

head 的不变式和可变式
在执行方法之前和之后,head 必须保持的不变式:

  • 所有“活着”的节点(指未删除节点),都能从 head 通过调用 succ() 方法遍历可达。
  • head 不能为 null。
  • head 节点的 next 域不能引用到自身。
    在执行方法之前和之后,head 的可变式:
  • head 节点的 item 域可能为 null,也可能不为 null。
  • 允许 tail 滞后(lag behind)于 head,也就是说:从 head 开始遍历队列,不一定能到达 tail。

tail 的不变式和可变式
在执行方法之前和之后,tail 必须保持的不变式:

  • 通过 tail 调用 succ() 方法,最后节点总是可达的。
  • tail 不能为 null。
    在执行方法之前和之后,tail 的可变式:
  • tail 节点的 item 域可能为 null,也可能不为 null。
  • 允许 tail 滞后于 head,也就是说:从 head 开始遍历队列,不一定能到达 tail。
  • tail 节点的 next 域可以引用到自身。

入队列过程

入队列就是将入队节点添加到队列的尾部,在 ConcurrentLinkedQueue 中,插入新节点时,不用考虑尾节点是否为有效节点,直接把新节点插入到尾节点的后面即可。由于 tail 可以指向任意节点,所以入队时必须先通过 tail 找到尾节点,然后才能执行插入操作。如果插入不成功(说明其他线程已经抢先插入了一个新的节点)就继续向后推进。重复上述迭代过程,直到插入成功为止。

/**
     * Inserts the specified element at the tail of this queue.
     * 在队列后插入一个元素
     * As the queue is unbounded, this method will never return {@code false}.
     * 因为队列是无限大的,这个方法永远不会回false
     *
     * @return {@code true} (as specified by {@link Queue#offer})
     * 返回true,
     * @throws NullPointerException if the specified element is null
     * 如果某元素为空则抛出空指针异常
     */
    public boolean offer(E e) {
        checkNotNull(e);//检查e是否为空
        final Node<E> newNode = new Node<E>(e);//创建新节点

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) { //1
                // p is last node. p是最后一个节点
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // hop two nodes at a time
                        //每两次,更新一下tail
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next CAS竞争失败,再次尝试。
            }
            else if (p == q)//2
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                //遇到哨兵节点,从head开始遍历。
                //但如果tail被修改,则使用tail(因为可能被修改正确了)
                p = (t != (t = tail)) ? t : head;
            else           //3
                // Check for tail updates after two hops.取下一个节点或者最后一个节点
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

这个方法没有任何锁操作。线程安全完全由CAS操作和队列的算法来保证。整个算法的核心是for循环,这个循环没有出口,知道尝试成功,这也符合CAS操作的流程。

当第一次加入元素的时候,由于队列为空,因此p.next为null。程序进入1处,程序将p的next节点赋值为newNode,也就是将新的元素加入到队列中。此时p==t成立,也就是不会执行casTail()代码更新tail末尾。如果casNext()成功,程序直接返回,如果失败,则再进行一次循环尝试,直到成功。因此,增加一个元素后,tail并不会被更新。

当程序尝试加入第二个元素时,由于此时t还在head的位置上,因此p.next指向实际的第一个元素,因此1处的q!=null,这表示q不是最后的节点。由于往队列中增加元素需要最后一个节点,因此,循环开始中查找最后一个节点。于是,程序进入3处,获得最后一个节点。此时,p实际上是指向链表中的第一个元素,它的next是null,故在第二个循环的时候,进入1处,p更新自己的next,让它指向新加入的节点。如果成功,由于此时p!=t成功,则会更新t所在位置,将t移到链表最后。
2处处理了p==q的情况。这种情况是由于遇到了哨兵节点导致的。所谓哨兵节点,就是next指向自己的节点。这种节点在队列中的存在价值不大,主要表示要删除的节点或者空节点。当遇到哨兵节点的时候,由于无法通过next取得后续的节点,因此很可能直接返回head,期望通过从链表头部开始遍历,进一步查找到链表末尾。但一旦发生在执行过程中,tail被其他线程修改的情况,则进行一次“打赌”,使用新的tail作为链表末尾,这样就避免了重新查找tail的开销。


image.png
  • 添加元素1。队列更新head节点的next节点为元素1节点。又因为tail节点默认情况下等于head节点,所以它们的next节点都指向元素1节点。
  • 添加元素2。队列首先设置元素1节点的next节点为元素2节点,然后更新tail节点指向元素2节点。
  • 添加元素3,设置tail节点的next节点为元素3节点。
  • 添加元素4,设置元素3的next节点为元素4节点,然后将tail节点指向元素4节点。

入队主要做两件事情,第一是将入队节点设置成当前队列尾节点的下一个节点。第二是更新tail节点,在入队列前如果tail节点的next节点不为空,则将入队节点设置成tail节点,如果tail节点的next节点为空,则将入队节点设置成tail的next节点,所以tail节点不总是尾节点

上面的分析从单线程入队的角度来理解入队过程,但是多个线程同时进行入队情况就变得更加复杂,因为可能会出现其他线程插队的情况。如果有一个线程正在入队,那么它必须先获取尾节点,然后设置尾节点的下一个节点为入队节点,但这时可能有另外一个线程插队了,那么队列的尾节点就会发生变化,这时当前线程要暂停入队操作,然后重新获取尾节点。让我们再看看源码来详细分析下它是如何使用CAS方式来入队的(JDK1.8):

    public boolean offer(E e) {
        checkNotNull(e);
        //创建入队节点
        final Node<E> newNode = new Node<E>(e);
        //t为tail节点,p为尾节点,默认相等,采用失败即重试的方式,直到入队成功
        for (Node<E> t = tail, p = t;;) {
            //获得p的下一个节点
            Node<E> q = p.next;
            // 如果下一个节点是null,也就是p节点就是尾节点
            if (q == null) {
              //将入队节点newNode设置为当前队列尾节点p的next节点
                if (p.casNext(null, newNode)) { 
                   //判断tail节点是不是尾节点,也可以理解为如果插入结点后tail节点和p节点距离达到两个结点
                    if (p != t) 
                     //如果tail不是尾节点则将入队节点设置为tail。
                     // 如果失败了,那么说明有其他线程已经把tail移动过 
                        casTail(t, newNode);  
                    return true;
                }
            }
                 // 如果p节点等于p的next节点,则说明p节点和q节点都为空,表示队列刚初始化,所以返回                            head节点
            else if (p == q)
                p = (t != (t = tail)) ? t : head;
            else
                //p有next节点,表示p的next节点是尾节点,则需要重新更新p后将它指向next节点
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

从源代码我们看出入队过程中主要做了三件事情,第一是定位出尾节点;第二个是使用CAS指令将入队节点设置成尾节点的next节点,如果不成功则重试;第三是重新定位tail节点。

从第一个if判断就来判定p有没有next节点如果没有则p是尾节点则将入队节点设置为p的next节点,同时如果tail节点不是尾节点则将入队节点设置为tail节点。如果p有next节点则p的next节点是尾节点,需要重新更新p后将它指向next节点。还有一种情况p等于p的next节点说明p节点和p的next节点都为空,表示这个队列刚初始化,正准备添加数据,所以需要返回head节点。

出队列

出队列的就是从队列里返回一个节点元素,并清空该节点对元素的引用。让我们通过每个节点出队的快照来观察下head节点的变化。


image.png

从上图可知,并不是每次出队时都更新head节点,当head节点里有元素时,直接弹出head节点里的元素,而不会更新head节点。只有当head节点里没有元素时,出队操作才会更新head节点。让我们再通过源码来深入分析下出队过程(JDK1.8):

    public E poll() {
        // 设置起始点  
        restartFromHead:
        for (;;) {
        //p表示head结点,需要出队的节点
            for (Node<E> h = head, p = h, q;;) {
            //获取p节点的元素
                E item = p.item;
                //如果p节点的元素不为空,使用CAS设置p节点引用的元素为null
                if (item != null && p.casItem(item, null)) {

                    if (p != h) // hop two nodes at a time
                    //如果p节点不是head节点则更新head节点,也可以理解为删除该结点后检查head是否与头结点相差两个结点,如果是则更新head节点
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                //如果p节点的下一个节点为null,则说明这个队列为空,更新head结点
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                //结点出队失败,重新跳到restartFromHead来进行出队
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

首先获取head节点的元素,并判断head节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走,如果不为空,则使用CAS的方式将head节点的引用设置成null,如果CAS成功,则直接返回head节点的元素,如果CAS不成功,表示另外一个线程已经进行了一次出队操作更新了head节点,导致元素发生了变化,需要重新获取head节点。如果p节点的下一个节点为null,则说明这个队列为空(此时队列没有元素,只有一个伪结点p),则更新head节点。

队列判空

有些人在判断队列是否为空时喜欢用queue.size()==0,让我们来看看size方法:

public int size() 
{
     int count = 0;
     for (Node<E> p = first(); p != null; p = succ(p))
         if (p.item != null)
             // Collection.size() spec says to max out
             if (++count == Integer.MAX_VALUE)
                 break;
     return count;
 }

可以看到这样在队列在结点较多时会依次遍历所有结点,这样的性能会有较大影响,因而可以考虑empty函数,它只要判断第一个结点(注意不一定是head指向的结点)。

  public boolean isEmpty() {
        return first() == null;
    }

HOPS的设计

通过上面对offer和poll方法的分析,我们发现tail和head是延迟更新的,两者更新触发时机为:

tail更新触发时机:当tail指向的节点的下一个节点不为null的时候,会执行定位队列真正的队尾节点的操作,找到队尾节点后完成插入之后才会通过casTail进行tail更新;当tail指向的节点的下一个节点为null的时候,只插入节点不更新tail。

head更新触发时机当head指向的节点的item域为null的时候,会执行定位队列真正的队头节点的操作,找到队头节点后完成删除之后才会通过updateHead进行head更新;当head指向的节点的item域不为null的时候,只删除节点不更新head。

并且在更新操作时,源码中会有注释为:hop two nodes at a time。所以这种延迟更新的策略就被叫做HOPS的大概原因是这个(猜的 :)),从上面更新时的状态图可以看出,head和tail的更新是“跳着的”即中间总是间隔了一个。那么这样设计的意图是什么呢?

如果让tail永远作为队列的队尾节点,实现的代码量会更少,而且逻辑更易懂。但是,这样做有一个缺点,如果大量的入队操作,每次都要执行CAS进行tail的更新,汇总起来对性能也会是大大的损耗。如果能减少CAS更新的操作,无疑可以大大提升入队的操作效率,所以doug lea大师每间隔1次(tail和队尾节点的距离为1)进行才利用CAS更新tail。对head的更新也是同样的道理,虽然,这样设计会多出在循环中定位队尾节点,但总体来说读的操作效率要远远高于写的性能,因此,多出来的在循环中定位尾节点的操作的性能损耗相对而言是很小的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容