ConcurrentLinkedQueue全解析

使用java的concurrent包写程序已经快两三年了,至今对其实现原理不怎么了解,突然感到非常惭愧。下定决心从这个包里面找几个核心类,从源码级别做一个全面的分析,搞懂它们的实现原理,以及它们为啥这么快,这一篇挑选ConcurrentLinkedQueue。

ConcurrentLinkedQueue

ConcurrentLinkedQueue是一个无锁化、非阻塞、线程安全的单向队列,JDK1.5提供,由大名鼎鼎的Doug Lea编写。不过要是你认为这是大神灵感爆发的杰作,那就错了,仔细读读java源文件的注释就知道,这个类的实现原理来自Michael & Scott设计的算法(请参考论文),而Michael & Scott也是在很多前人研究的基础上加以综合完善得到该算法的,成为很多具体平台并发FIFO队列实现的蓝本。如果不读读这篇论文,直接阅读java源码,光凭那点注释,会有点摸不着头脑。

我们要研究的ConcurrentLinkedQueue可以说是这个算法的java版本,当然针对java语言做了很多修改。到这里,你要是认为Doug Lea就干了点将原始算法翻译成java的活,那又错了,从一个理论算法到具体某个语言的实现还有相当长的距离;而且java语言本来就比较慢,所以为了挖掘性能,作者做了大量精细而巧妙的设计。

ConcurrentLinkedQueue实现的依赖cas操作和java的valotile语义,所以不理解这两点是无法看懂代码的。关于cas操作和valotile关键字,不是本文要讲的内容,有很多的资料可以查询,比如:
JAVA中的CAS
volatile关键字解析

说明:

  1. java里面并没有指针的概念,但是算法原理结构理论讲队列,有“头指针”、“尾指针”的说法,为了方便,下面在讲解ConcurrentLinkedQueue内部结构时也会使用这两个术语;
  2. 采用的源码是Java 1.8版本;
  3. 强烈建议先阅读cas和java内存模型相关资料。

一、队列的内部链表结构

下面是摘录自源码,经过简化的ConcurrentLinkedQueue定义片段。

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {
     
    private transient volatile Node<E> head;
     private transient volatile Node<E> tail;

    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }
 }

一个空的ConcurrentLinkedQueue包含一个空节点(数据字段=null),队列的HEAD和TAIL指针都指向这个节点。当经过一些出入队列的操作以后,队列的结构可能类似下面:

【N1】->【N2】->【N3】->【N4】->【N5】->【N6】
HEAD                                                                    TAIL

但是,也有可能类似这样:
【N1】->【N2】->【N3】->【N4】->【N5】->【N6】
                  HEAD                                  TAIL

上边这个结构,表示N1节点已经出了队列,此时的HEAD是N2节点,但是N1节点的next指针仍然保持着。而TAIL节点指向的N5却不是真正的队尾,队尾是N6。这是因为在入队列的过程中,由于并发,导致TAIL更新不及时。

同样还是上面的结构,如果我把里面的数据字段标识出来,有可能是这样的:
【N1(null)】->【N2(null)】->【N3(data)】->【N4(data)】->【N5(data)】->【N6(data)】
                          HEAD                                                                 TAIL
HEAD节点的数据字段是NULL,说明这是一个空节点,这个队列语义上第一个节点应该是N3才对。

再来看一个状态:
【N1】<-| 【N2】->【N3】->【N4】->【N5】->【N6】
                  HEAD                                  TAIL
N1节点的next字段指向自身,这表示N1节点彻底断开和队列的关系,为什么不直接将N1的next置为null呢,因为这样就不能直接断定N1到底是不是尾节点了。

上面这些情形是ConcurrentLinkedQueue在执行一些操作之后可能处于的状态,之所于允许这些看起来不够严谨的状态,是为了在并发过程中提高效率。但是不管如何,在整个生命周期内,算法保持以下属性:

  1. 链表里面至少会有一个节点,数据字段为null的节点是空节点;
  2. 顺着HEAD指针肯定能找到的真正的头节点,并能访问到所有节点;
  3. TAIL指针不一定指向有效节点,更不能保证指向真正的尾节点,但是它大部分情况下指向尾节点或接近尾节点,因此可以提高效率;
  4. 和队列断开的节点,next字段指向自身;
  5. 对于入了队列的节点(哪怕又出了队列),只有尾节点的next字段才等于null。

在ConcurrentLinkedQueue内部链表上,可能有一个或多个数据字段为null的空节点,空节点虽然没有数据,但是对高效地保持链表的连接状态至关重要。另一方面,节点的数据字段和next字段值的变化有很强的规律性:数据字段在节点入队列是不为null,出队列时变为null;next字段入队列时是null,出队列时先保持不变,再指向自身。这种规律性可以有效规避掉cas操作的ABA问题。

上面这些也可以认为是ConcurrentLinkedQueue实现算法的不变式,有些在源码注释里面就有说明,有些是我总结的。带着这些原则才能理解每个方法的实现为什么是这样的。

二、内部节点类

直接上源码吧,由于Node提供一个比较简单的节点数据结构,逻辑不多,但是我在读的过程中还是有一些疑惑,直接将解释写在下面的源码块里面了。

private static class Node<E> {
       volatile E item;
       volatile Node<E> next;

       /**
        * Constructs a new node.  Uses relaxed write because item can
        * only be seen after publication via casNext.
        */
       Node(E item) {
           UNSAFE.putObject(this, itemOffset, item);
       }
       为什么要使用UNSAFE.putObject而不直接赋值呢?刚看到“relaxed write”这个注释时一脸懵逼,
       后来才搞明白,因为item是volatile修饰的,如果直接赋值,会触发volatile的内存同步语义,
       在初始化阶段不需要如此,所以使用UNSAFE.putObject能提高一些性能。

       boolean casItem(E cmp, E val) {
           return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
       }
       cas操作设置数据字段,这个操作在出队列操作时,保证并发安全。

       void lazySetNext(Node<E> val) {
           UNSAFE.putOrderedObject(this, nextOffset, val);
       }
       这个操作相比UNSAFE.putObject,会一定程度禁止指令重排,
       相比volatile赋值,不保证全局可见性,性能也稍好一些,用于节点出队列后断开链接,

       boolean casNext(Node<E> cmp, Node<E> val) {
           return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
       }
       给节点设置next字段,入队列的关键操作,cas操作保证并发安全
       
       // Unsafe mechanics
       private static final sun.misc.Unsafe UNSAFE;
       private static final long itemOffset;
       private static final long nextOffset;
 } 
  

三、出队列:poll方法

poll的目标是安全地取出第一个有效节点的数据,如果没有返回null。
源码如下,为了方便分析,对关键行,我加上了L1~Ln这样的行标记 。

    public E poll() {
        restartFromHead:
        for (;;) {
        L1 for (Node<E> h = head, p = h, q;;) {
                E item = p.item;

        L2      if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
        L3          if (p != h) // hop two nodes at a time
        L4              updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
        L5      else if ((q = p.next) == null) {
        L6          updateHead(h, p);
                    return null;
                }
        L7      else if (p == q)
                    continue restartFromHead;
                else
        L8          p = q;
            }
        }
    }
  • L1行:开始一个类似遍历的for循环,初始化h,p为head节点,p是当前检查的节点,q是下一个节点。
  • L2行:如果p节点的数据不为空,那么p肯定就是真正的第一个节点,只要能将节点的数据字段(item)置为null,出队列就算成功(参考第一节总结的节点属性);p.casItem(item, null)操作保证了并发poll的安全,如果有其他线程抢先一步执行成功,那么casItem操作会失败。

L2行如果执行成功,那么出队列可以算是完成了,此时的head指向一个空节点,这种情况是允许的,但是队列里面也不能总是留很多空节点,所以L3L4行,就是把head指针后移。

  • L3行:p==h,说明刚出队列的p就是遍历开始的第一个节点,有2种情况,1)没有并发poll,head指向了空节点,队列里面只有且仅有一个空节点,此时没有必要做head移动;2)有并发poll,p==h说明本线程在竞争中领先,那么交由那个落后的线程来做head移动。
  • L4行:如果p.next==null,说明p是尾节点,链表空了,head也指向p;否则head指向p.next。两种情况都是正确的。
  • L5行:无论p的item字段为null,还是在并发中竞争失败,我们都要往后遍历;于是q=p.next,如果q==null,说明链表空了(参考第一节总结的节点属性),此时将head置为尾节点;当然,如果h==p是不需要执行的,updateHead里面有这个判断。
  • L7行:p==q实际上就是p==p.next,也即p的next指向自身,这说明p节点已经和链表断开了(参考第一节总结的节点属性),这种情况下,只能重新开始poll。
  • L8行:正常迭代下一个节点。

上面L2~L4行,每个线程在成功poll一个节点之后,就会尝试把头结点移到p的下一个节点,这样保证了head后面最多只有一个空节点。L3行的那个判断如果没有,也不会影响程序的正确性,但是性能会差一些。

再介绍一下更新头指针的updateHead方法:

    final void updateHead(Node<E> h, Node<E> p) {
        if (h != p && casHead(h, p))
            h.lazySetNext(h);
    }

将头指针从h转移到p,就是将节点h出队列,一旦出队列成功,就将h的next字段指向自身,防止那些出队列的节点仍然互相链接,妨碍垃圾收集,这个操作也维护了第一节总结的节点属性。值得注意的是h.lazySetNext(h)操作没有volatile的语义,有可能对其他线程暂时不可见。 假设poll方法执行到L5行时,p节点已经被另外一个线程出队列了,但是本线程不知道,那么L7行的判断也可能失败,最终执行到L8行,这种情况下程序仍然是正确的,不过会多遍历一些节点。(只能认为,作者经过测量,认为这种情况造成的性能损失低于volatile造成的性能损失)。

还有一个有意思的现象是,在poll的过程中,并不会去修正TAIL指针,所以在链表的TAIL指针是有可能落在HEAD之后的,甚至暂时指向一个已经出队列的节点。

四、入队列:offer方法

offer的目标是将新节点插入到尾节点的后面,即使在并发情况下也不丢失数据。

public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

     L1 for (Node<E> t = tail, p = t;;) {
     L2     Node<E> q = p.next;
     L3     if (q == null) {
                // p is last node
     L4         if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
     L5            if (p != t) // hop two nodes at a time
     L6                 casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
     L7     else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
     L8        p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
     L9         p = (p != t && t != (t = tail)) ? t : q;
        }
    }
  • L1行:开启一个从tail指针开始遍历的循环,p指向当前疑似尾节点;
  • L2行:初始化q=p的下一个节点;
  • L3行:如果q==null,说明此刻p确实是尾节点,新节点应该插到后面;
  • L4行:casNext执行安全的插入操作;
    如果成功插入,那么应该考虑更新tail指针,L5L6行执行这个操作;
  • L5行:和poll方法一样,如果插入的地方不是在tail处,才往后移,这个行为可以保证,tail后面最多还有一个节点;
  • L6行:cas操作安全移动tail指针。
    后面的两个if分支说明p不是尾节点。
  • L7行:p==q,也即p的next字段指向自身,说明p是一个脱离了链表的节点(并发poll操作造成的,参考第一节总结的节点属性)),需要找一个节点重新开始遍历。
  • L8行:从head重新开始肯定正确,但是如果tail指针有更新过,那么从tail开始大概率可能效率更高。
  • L9行:按道理直接p=q跳到下一个节点就Ok了,但是代码里面做了这个判断(p != t && t != (t = tail)),如果p已经正常往后移动了一次,且tail发生了变化,那么从新的tail重新开始。为什么要加个(p!=t)的前置判断呢?我认为是为了提高效率,因为tail是valotile变量,读取有一定代价,当p==t的时候,作者认为再往后跳一下成功的概率挺高。(作者应该经过测量,可见对性能的压榨已经丧心病狂了)。

移除操作:remove

再来看移除操作,通过数据字段的比较,来移除数据相等的节点。

public boolean remove(Object o) {
    if (o != null) {
        Node<E> next, pred = null;
        for (Node<E> p = first(); p != null; pred = p, p = next) {
            boolean removed = false;
            E item = p.item;
        L1   if (item != null) {
        L2      if (!o.equals(item)) {
        L3          next = succ(p);
        L4          continue;
                }
        L5      removed = p.casItem(item, null);
            }

        L6  next = succ(p);
        L7  if (pred != null && next != null) // unlink
                pred.casNext(p, next);
            if (removed)
                return true;
        }
    }
    return false;
}
  • L1行:判断当前是不是空节点。
  • L2行:如果节点数据与输入参数相等,说明这是一个需要移除的节点,否则应该跳到下一个节点;
  • L3/4行:节点数据不相等,取出当前节点p的下一个节点,然后重新开始循环;
  • L5行:节点数据相等,尝试清空数据;如果别的线程并发执行remove或poll,这一步操作可能失败;
  • L6行:执行到这一行说明,p是空节点,(本来就是空,或在L5被清空);取出next节点;
  • L7行:由于p是空节点,尝试将p的前继和后继相连;

上面再取p的下一个节点的时候,用了调用了succ这个方法,这里也看一下:

   final Node<E> succ(Node<E> p) {
        Node<E> next = p.next;
        return (p == next) ? head : next;
    }

很简单,如果p的next指向自己,说明p已经脱离链表,此时返回head指向的节点。

源码可以看出,如果队列里面有多个相同数据的节点,一次remove调用最多删除一个。

迭代器

private class Itr implements Iterator<E> {
     L1 private Node<E> nextNode;
     L2 private E nextItem;
     L3 private Node<E> lastRet;

        Itr() {
     L4     advance();
        }
        private E advance() {
     L5     lastRet = nextNode;
     L6     E x = nextItem;

            Node<E> pred, p;
     L7       if (nextNode == null) {
     L8           p = first();
                pred = null;
            } else {
                pred = nextNode;
     L9         p = succ(nextNode);
            }

            for (;;) {
     L10        if (p == null) {
                    nextNode = null;
                    nextItem = null;
                    return x;
                }
                E item = p.item;
     L11        if (item != null) {
                    nextNode = p;
                    nextItem = item;
                    return x;
                } else {
                    // skip over nulls
     L12            Node<E> next = succ(p);
     L13            if (pred != null && next != null)
     L14                pred.casNext(p, next);
     L15            p = next;
                }
            }
        }

        public boolean hasNext() {
     L16    return nextNode != null;
        }

        public E next() {
            if (nextNode == null) throw new NoSuchElementException();
     L17    return advance();
        }

        public void remove() {
     L18    Node<E> l = lastRet;
            if (l == null) throw new IllegalStateException();
            // rely on a future traversal to relink.
     L19    l.item = null;
     L20    lastRet = null;
        }
    }

源代码稍微长点,我删掉了注释。

  • L1行:成员变量nextNode指向下一个节点,用来支持hasNext,next等操作;
  • L2行:成员变量nextItem是下一个节点的值,即nextNode的item值;为什么单独需要这个变量呢,因为由于并发,nextNode的item字段有可能被置空,而迭代器在hasNext的时候是不允许返回空值的,所以在迭代到nextNode的时候,立即取出item字段保存起来。
  • L3行:成员变量lastRet指向上一个节点,用来支持删除操作;
  • L4行:构造函数仅调用advance方法,从名字上推测,应该是将迭代器往后推一步;
  • L5行:核心方法advance的第一行,先保存lastRet,因为这个方法执行完了以后,nextNode就指向下一个节点了;
  • L6行:临时变量x保存nextItem,因为advance执行完以后,nextItem就变成下一个节点的值了;
  • L7行:如果nextNode==null,说明这是初始化操作;
  • L8行:直接调用p=first(),将p赋值成第一个有效节点或null(队列空的);
  • L9行:不是初始化调用,那么p赋值成nextNode的后继节点;
    至此p保存了nextNode的备选值
  • L10行:如果p==null,说明已经到了链表尾部了,迭代结束;
  • L11行:如果p.item!=null,说明p是有效节点,ok,就是它了;
  • L12~L15行:如果p.item==null,说明p是无效节点,我们应该跳过它,继续寻找;
  • L13~L14行:如果p是空节点,我们尝试将它的前继与后继节点相连;
  • L16行:hasNext()方法的实现,就看nextNode是否空;
  • L17行:next()方法,很简单就是调用advance;
  • L18~L20行:remove()方法,将lastRet指向节点的数据清空;因为当我们调用next方法时,返回的值对应的是lastRet节点。

从上面的实现我们可以知道,在迭代过程中,如果有数据并发入队列,这些数据是可以被迭代到的;如果有值被并发删除或出队列,那么这些数据有可能也被迭代到。尤其是迭代器的remove方法,删除的有可能是一个不存在的数据。

队列长度

ConcurrentLinkedQueue的size方法是很没效率的的,实际是把队列遍历了一遍来计算长度。所以推荐大家使用isEmpty来判断队列是否为空,而不要使用size()==0。

public boolean isEmpty() {
    return first() == null;
}

 public int size() {
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p))
        if (p.item != null)
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE)
                break;
    return count;
}

为什么ConcurrentLinkedQueue内部不维护一个size变量来跟踪队列的长度呢?不是不想,是做不到,由于ConcurrentLinkedQueue使用无锁化设计,通过cas操作来保证并发安全。而cas操作只能保证单个变量的并发安全性,无法在出入队列操作的同时,维护size变量。

小结

上面基本讲解了ConcurrentLinkedQueue源代码的大部分核心内容。在开篇的时候也讲过,高效的队列算法不是一朝一夕的出来的,看似短短一两百行代码,是经过前人大量的研究才得到的;因此我们读起来有些吃力是理所当然的,如果不去读一读相关的论文及其他资料,那就更困难了。ConcurrentLinkedQueue的代码在深入挖掘jvm特性的基础上,做了不少性能优化,比如offer方法的L8L9行;当然,上文的解释是我的推测,不一定正确,如果大家发现谬误之处,欢迎指正。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,864评论 6 494
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,175评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,401评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,170评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,276评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,364评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,401评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,179评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,604评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,902评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,070评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,751评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,380评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,077评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,312评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,924评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,957评论 2 351

推荐阅读更多精彩内容