使用java的concurrent包写程序已经快两三年了,至今对其实现原理不怎么了解,突然感到非常惭愧。下定决心从这个包里面找几个核心类,从源码级别做一个全面的分析,搞懂它们的实现原理,以及它们为啥这么快,这一篇挑选ConcurrentLinkedQueue。
ConcurrentLinkedQueue
ConcurrentLinkedQueue是一个无锁化、非阻塞、线程安全的单向队列,JDK1.5提供,由大名鼎鼎的Doug Lea编写。不过要是你认为这是大神灵感爆发的杰作,那就错了,仔细读读java源文件的注释就知道,这个类的实现原理来自Michael & Scott设计的算法(请参考论文),而Michael & Scott也是在很多前人研究的基础上加以综合完善得到该算法的,成为很多具体平台并发FIFO队列实现的蓝本。如果不读读这篇论文,直接阅读java源码,光凭那点注释,会有点摸不着头脑。
我们要研究的ConcurrentLinkedQueue可以说是这个算法的java版本,当然针对java语言做了很多修改。到这里,你要是认为Doug Lea就干了点将原始算法翻译成java的活,那又错了,从一个理论算法到具体某个语言的实现还有相当长的距离;而且java语言本来就比较慢,所以为了挖掘性能,作者做了大量精细而巧妙的设计。
ConcurrentLinkedQueue实现的依赖cas操作和java的valotile语义,所以不理解这两点是无法看懂代码的。关于cas操作和valotile关键字,不是本文要讲的内容,有很多的资料可以查询,比如:
JAVA中的CAS
volatile关键字解析
说明:
- java里面并没有指针的概念,但是算法原理结构理论讲队列,有“头指针”、“尾指针”的说法,为了方便,下面在讲解ConcurrentLinkedQueue内部结构时也会使用这两个术语;
- 采用的源码是Java 1.8版本;
- 强烈建议先阅读cas和java内存模型相关资料。
一、队列的内部链表结构
下面是摘录自源码,经过简化的ConcurrentLinkedQueue定义片段。
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
}
一个空的ConcurrentLinkedQueue包含一个空节点(数据字段=null),队列的HEAD和TAIL指针都指向这个节点。当经过一些出入队列的操作以后,队列的结构可能类似下面:
【N1】->【N2】->【N3】->【N4】->【N5】->【N6】
HEAD TAIL
但是,也有可能类似这样:
【N1】->【N2】->【N3】->【N4】->【N5】->【N6】
HEAD TAIL
上边这个结构,表示N1节点已经出了队列,此时的HEAD是N2节点,但是N1节点的next指针仍然保持着。而TAIL节点指向的N5却不是真正的队尾,队尾是N6。这是因为在入队列的过程中,由于并发,导致TAIL更新不及时。
同样还是上面的结构,如果我把里面的数据字段标识出来,有可能是这样的:
【N1(null)】->【N2(null)】->【N3(data)】->【N4(data)】->【N5(data)】->【N6(data)】
HEAD TAIL
HEAD节点的数据字段是NULL,说明这是一个空节点,这个队列语义上第一个节点应该是N3才对。
再来看一个状态:
【N1】<-| 【N2】->【N3】->【N4】->【N5】->【N6】
HEAD TAIL
N1节点的next字段指向自身,这表示N1节点彻底断开和队列的关系,为什么不直接将N1的next置为null呢,因为这样就不能直接断定N1到底是不是尾节点了。
上面这些情形是ConcurrentLinkedQueue在执行一些操作之后可能处于的状态,之所于允许这些看起来不够严谨的状态,是为了在并发过程中提高效率。但是不管如何,在整个生命周期内,算法保持以下属性:
- 链表里面至少会有一个节点,数据字段为null的节点是空节点;
- 顺着HEAD指针肯定能找到的真正的头节点,并能访问到所有节点;
- TAIL指针不一定指向有效节点,更不能保证指向真正的尾节点,但是它大部分情况下指向尾节点或接近尾节点,因此可以提高效率;
- 和队列断开的节点,next字段指向自身;
- 对于入了队列的节点(哪怕又出了队列),只有尾节点的next字段才等于null。
在ConcurrentLinkedQueue内部链表上,可能有一个或多个数据字段为null的空节点,空节点虽然没有数据,但是对高效地保持链表的连接状态至关重要。另一方面,节点的数据字段和next字段值的变化有很强的规律性:数据字段在节点入队列是不为null,出队列时变为null;next字段入队列时是null,出队列时先保持不变,再指向自身。这种规律性可以有效规避掉cas操作的ABA问题。
上面这些也可以认为是ConcurrentLinkedQueue实现算法的不变式,有些在源码注释里面就有说明,有些是我总结的。带着这些原则才能理解每个方法的实现为什么是这样的。
二、内部节点类
直接上源码吧,由于Node提供一个比较简单的节点数据结构,逻辑不多,但是我在读的过程中还是有一些疑惑,直接将解释写在下面的源码块里面了。
private static class Node<E> {
volatile E item;
volatile Node<E> next;
/**
* Constructs a new node. Uses relaxed write because item can
* only be seen after publication via casNext.
*/
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
为什么要使用UNSAFE.putObject而不直接赋值呢?刚看到“relaxed write”这个注释时一脸懵逼,
后来才搞明白,因为item是volatile修饰的,如果直接赋值,会触发volatile的内存同步语义,
在初始化阶段不需要如此,所以使用UNSAFE.putObject能提高一些性能。
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
cas操作设置数据字段,这个操作在出队列操作时,保证并发安全。
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
这个操作相比UNSAFE.putObject,会一定程度禁止指令重排,
相比volatile赋值,不保证全局可见性,性能也稍好一些,用于节点出队列后断开链接,
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
给节点设置next字段,入队列的关键操作,cas操作保证并发安全
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
}
三、出队列:poll方法
poll的目标是安全地取出第一个有效节点的数据,如果没有返回null。
源码如下,为了方便分析,对关键行,我加上了L1~Ln这样的行标记 。
public E poll() {
restartFromHead:
for (;;) {
L1 for (Node<E> h = head, p = h, q;;) {
E item = p.item;
L2 if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
L3 if (p != h) // hop two nodes at a time
L4 updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
L5 else if ((q = p.next) == null) {
L6 updateHead(h, p);
return null;
}
L7 else if (p == q)
continue restartFromHead;
else
L8 p = q;
}
}
}
- L1行:开始一个类似遍历的for循环,初始化h,p为head节点,p是当前检查的节点,q是下一个节点。
- L2行:如果p节点的数据不为空,那么p肯定就是真正的第一个节点,只要能将节点的数据字段(item)置为null,出队列就算成功(参考第一节总结的节点属性);p.casItem(item, null)操作保证了并发poll的安全,如果有其他线程抢先一步执行成功,那么casItem操作会失败。
L2行如果执行成功,那么出队列可以算是完成了,此时的head指向一个空节点,这种情况是允许的,但是队列里面也不能总是留很多空节点,所以L3L4行,就是把head指针后移。
- L3行:p==h,说明刚出队列的p就是遍历开始的第一个节点,有2种情况,1)没有并发poll,head指向了空节点,队列里面只有且仅有一个空节点,此时没有必要做head移动;2)有并发poll,p==h说明本线程在竞争中领先,那么交由那个落后的线程来做head移动。
- L4行:如果p.next==null,说明p是尾节点,链表空了,head也指向p;否则head指向p.next。两种情况都是正确的。
- L5行:无论p的item字段为null,还是在并发中竞争失败,我们都要往后遍历;于是q=p.next,如果q==null,说明链表空了(参考第一节总结的节点属性),此时将head置为尾节点;当然,如果h==p是不需要执行的,updateHead里面有这个判断。
- L7行:p==q实际上就是p==p.next,也即p的next指向自身,这说明p节点已经和链表断开了(参考第一节总结的节点属性),这种情况下,只能重新开始poll。
- L8行:正常迭代下一个节点。
上面L2~L4行,每个线程在成功poll一个节点之后,就会尝试把头结点移到p的下一个节点,这样保证了head后面最多只有一个空节点。L3行的那个判断如果没有,也不会影响程序的正确性,但是性能会差一些。
再介绍一下更新头指针的updateHead方法:
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
将头指针从h转移到p,就是将节点h出队列,一旦出队列成功,就将h的next字段指向自身,防止那些出队列的节点仍然互相链接,妨碍垃圾收集,这个操作也维护了第一节总结的节点属性。值得注意的是h.lazySetNext(h)操作没有volatile的语义,有可能对其他线程暂时不可见。 假设poll方法执行到L5行时,p节点已经被另外一个线程出队列了,但是本线程不知道,那么L7行的判断也可能失败,最终执行到L8行,这种情况下程序仍然是正确的,不过会多遍历一些节点。(只能认为,作者经过测量,认为这种情况造成的性能损失低于volatile造成的性能损失)。
还有一个有意思的现象是,在poll的过程中,并不会去修正TAIL指针,所以在链表的TAIL指针是有可能落在HEAD之后的,甚至暂时指向一个已经出队列的节点。
四、入队列:offer方法
offer的目标是将新节点插入到尾节点的后面,即使在并发情况下也不丢失数据。
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
L1 for (Node<E> t = tail, p = t;;) {
L2 Node<E> q = p.next;
L3 if (q == null) {
// p is last node
L4 if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
L5 if (p != t) // hop two nodes at a time
L6 casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
L7 else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
L8 p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
L9 p = (p != t && t != (t = tail)) ? t : q;
}
}
- L1行:开启一个从tail指针开始遍历的循环,p指向当前疑似尾节点;
- L2行:初始化q=p的下一个节点;
- L3行:如果q==null,说明此刻p确实是尾节点,新节点应该插到后面;
- L4行:casNext执行安全的插入操作;
如果成功插入,那么应该考虑更新tail指针,L5L6行执行这个操作; - L5行:和poll方法一样,如果插入的地方不是在tail处,才往后移,这个行为可以保证,tail后面最多还有一个节点;
- L6行:cas操作安全移动tail指针。
后面的两个if分支说明p不是尾节点。 - L7行:p==q,也即p的next字段指向自身,说明p是一个脱离了链表的节点(并发poll操作造成的,参考第一节总结的节点属性)),需要找一个节点重新开始遍历。
- L8行:从head重新开始肯定正确,但是如果tail指针有更新过,那么从tail开始大概率可能效率更高。
- L9行:按道理直接p=q跳到下一个节点就Ok了,但是代码里面做了这个判断
(p != t && t != (t = tail))
,如果p已经正常往后移动了一次,且tail发生了变化,那么从新的tail重新开始。为什么要加个(p!=t)的前置判断呢?我认为是为了提高效率,因为tail是valotile变量,读取有一定代价,当p==t的时候,作者认为再往后跳一下成功的概率挺高。(作者应该经过测量,可见对性能的压榨已经丧心病狂了)。
移除操作:remove
再来看移除操作,通过数据字段的比较,来移除数据相等的节点。
public boolean remove(Object o) {
if (o != null) {
Node<E> next, pred = null;
for (Node<E> p = first(); p != null; pred = p, p = next) {
boolean removed = false;
E item = p.item;
L1 if (item != null) {
L2 if (!o.equals(item)) {
L3 next = succ(p);
L4 continue;
}
L5 removed = p.casItem(item, null);
}
L6 next = succ(p);
L7 if (pred != null && next != null) // unlink
pred.casNext(p, next);
if (removed)
return true;
}
}
return false;
}
- L1行:判断当前是不是空节点。
- L2行:如果节点数据与输入参数相等,说明这是一个需要移除的节点,否则应该跳到下一个节点;
- L3/4行:节点数据不相等,取出当前节点p的下一个节点,然后重新开始循环;
- L5行:节点数据相等,尝试清空数据;如果别的线程并发执行remove或poll,这一步操作可能失败;
- L6行:执行到这一行说明,p是空节点,(本来就是空,或在L5被清空);取出next节点;
- L7行:由于p是空节点,尝试将p的前继和后继相连;
上面再取p的下一个节点的时候,用了调用了succ这个方法,这里也看一下:
final Node<E> succ(Node<E> p) {
Node<E> next = p.next;
return (p == next) ? head : next;
}
很简单,如果p的next指向自己,说明p已经脱离链表,此时返回head指向的节点。
源码可以看出,如果队列里面有多个相同数据的节点,一次remove调用最多删除一个。
迭代器
private class Itr implements Iterator<E> {
L1 private Node<E> nextNode;
L2 private E nextItem;
L3 private Node<E> lastRet;
Itr() {
L4 advance();
}
private E advance() {
L5 lastRet = nextNode;
L6 E x = nextItem;
Node<E> pred, p;
L7 if (nextNode == null) {
L8 p = first();
pred = null;
} else {
pred = nextNode;
L9 p = succ(nextNode);
}
for (;;) {
L10 if (p == null) {
nextNode = null;
nextItem = null;
return x;
}
E item = p.item;
L11 if (item != null) {
nextNode = p;
nextItem = item;
return x;
} else {
// skip over nulls
L12 Node<E> next = succ(p);
L13 if (pred != null && next != null)
L14 pred.casNext(p, next);
L15 p = next;
}
}
}
public boolean hasNext() {
L16 return nextNode != null;
}
public E next() {
if (nextNode == null) throw new NoSuchElementException();
L17 return advance();
}
public void remove() {
L18 Node<E> l = lastRet;
if (l == null) throw new IllegalStateException();
// rely on a future traversal to relink.
L19 l.item = null;
L20 lastRet = null;
}
}
源代码稍微长点,我删掉了注释。
- L1行:成员变量nextNode指向下一个节点,用来支持hasNext,next等操作;
- L2行:成员变量nextItem是下一个节点的值,即nextNode的item值;为什么单独需要这个变量呢,因为由于并发,nextNode的item字段有可能被置空,而迭代器在hasNext的时候是不允许返回空值的,所以在迭代到nextNode的时候,立即取出item字段保存起来。
- L3行:成员变量lastRet指向上一个节点,用来支持删除操作;
- L4行:构造函数仅调用advance方法,从名字上推测,应该是将迭代器往后推一步;
- L5行:核心方法advance的第一行,先保存lastRet,因为这个方法执行完了以后,nextNode就指向下一个节点了;
- L6行:临时变量x保存nextItem,因为advance执行完以后,nextItem就变成下一个节点的值了;
- L7行:如果nextNode==null,说明这是初始化操作;
- L8行:直接调用p=first(),将p赋值成第一个有效节点或null(队列空的);
- L9行:不是初始化调用,那么p赋值成nextNode的后继节点;
至此p保存了nextNode的备选值 - L10行:如果p==null,说明已经到了链表尾部了,迭代结束;
- L11行:如果p.item!=null,说明p是有效节点,ok,就是它了;
- L12~L15行:如果p.item==null,说明p是无效节点,我们应该跳过它,继续寻找;
- L13~L14行:如果p是空节点,我们尝试将它的前继与后继节点相连;
- L16行:hasNext()方法的实现,就看nextNode是否空;
- L17行:next()方法,很简单就是调用advance;
- L18~L20行:remove()方法,将lastRet指向节点的数据清空;因为当我们调用next方法时,返回的值对应的是lastRet节点。
从上面的实现我们可以知道,在迭代过程中,如果有数据并发入队列,这些数据是可以被迭代到的;如果有值被并发删除或出队列,那么这些数据有可能也被迭代到。尤其是迭代器的remove方法,删除的有可能是一个不存在的数据。
队列长度
ConcurrentLinkedQueue的size方法是很没效率的的,实际是把队列遍历了一遍来计算长度。所以推荐大家使用isEmpty来判断队列是否为空,而不要使用size()==0。
public boolean isEmpty() {
return first() == null;
}
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
}
为什么ConcurrentLinkedQueue内部不维护一个size变量来跟踪队列的长度呢?不是不想,是做不到,由于ConcurrentLinkedQueue使用无锁化设计,通过cas操作来保证并发安全。而cas操作只能保证单个变量的并发安全性,无法在出入队列操作的同时,维护size变量。
小结
上面基本讲解了ConcurrentLinkedQueue源代码的大部分核心内容。在开篇的时候也讲过,高效的队列算法不是一朝一夕的出来的,看似短短一两百行代码,是经过前人大量的研究才得到的;因此我们读起来有些吃力是理所当然的,如果不去读一读相关的论文及其他资料,那就更困难了。ConcurrentLinkedQueue的代码在深入挖掘jvm特性的基础上,做了不少性能优化,比如offer方法的L8L9行;当然,上文的解释是我的推测,不一定正确,如果大家发现谬误之处,欢迎指正。