JDK中,提供了一个ConcurrentLinkedQueue类来实现高并发的队列。这个队列使用链表作为其数据结构。这个类算是高并发环境中性能最好的队列。高性能是因为其内部复杂的实现。
CAS
CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。
ConcurrentLinkedQueue是一种线程安全的队列。他是使用非阻塞算法(CAS)来实现线程安全的。ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部;当我们获取一个元素时,它会返回队列头部的元素。
ConcurrentLinkedQueue结构
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable
ConcurrentLinkedQueue由head节点和tail节点组成,每个节点(Node)由节点元素(item)和指向下一个节点(next)的引用组成,节点与节点之间就是通过这个next关联起来,从而组成一张链表结构的队列。默认情况下head节点存储的元素为空,tail节点等于head节点
private static class Node<E> {
private volatile E item; // 声明为 volatile 型
private volatile Node<E> next; // 声明为 volatile 型
Node(E item) { // 创建新节点
lazySetItem(item); // 惰性设置 item 域的值
}
E getItem() {
return item;
}
boolean casItem(E cmp, E val) { // 使用 CAS 指令设置 item 域的值
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void setItem(E val) { // 使用“volatile 写”的方式,设置 item 域的值
item = val;
}
void lazySetItem(E val) { //惰性设置 item 域的值
UNSAFE.putOrderedObject(this, itemOffset, val);
}
void lazySetNext(Node<E> val) { // 惰性设置 next 域的值
UNSAFE.putOrderedObject(this, nextOffset, val);
}
Node<E> getNext() {
return next;
}
//CAS 设置 next 域的值
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
private static final sun.misc.Unsafe UNSAFE= // 域更新器
sun.misc.Unsafe.getUnsafe();
private static final long nextOffset= //next 域的偏移量
objectFieldOffset(UNSAFE, "next", Node.class);
private static final long itemOffset= //item 域的偏移量
objectFieldOffset(UNSAFE, "item", Node.class);
}
其实这是一个静态内部类,item是来表示元素的。next表示当前Node的下一个元素。
对Node进行操作时,使用了CAS操作:
//表示设置当前Node的item值。第一个参数为期望值,第二个参数为设置目标值。当当前值等于期望值时(就是没有被其他人改过),就会将目标设置为val。
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
//和上个方法类似,是用来设置next字段。
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
ConcurrentLinkedQueue内部有两个重要的字段:head和tail,分别表示链表的头部和尾部,当然都得是Node类型。对于head来说,它永远不会为null,并且通过head和succ()后继方法一定能完整地遍历整个链表。对于tail来说,它自然表示队列的末尾。
以下是构造函数:
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
但是ConcurrentLinkedQueue的内部实现非常复杂,它允许在运行时链表处于多个不同的状态。以tail为例,一般来说我们期望tail总是作为链表的末尾,但实际上tail的更新并不是及时的,可能会产生拖延现象。就是说,有可能指向的并不是真正的尾巴,真正的可能在后面一个或者多个。
再来看看Node类的结构:
private static class Node<E> {
volatile E item;
volatile Node<E> next;
.......
}
Node节点主要包含了两个域:一个是数据域item,另一个是next指针,用于指向下一个节点从而构成链式队列。并且都是用volatile进行修饰的,以保证内存可见性
另外ConcurrentLinkedQueue含有这样两个成员变量:
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
说明ConcurrentLinkedQueue通过持有头尾指针进行管理队列。当我们调用无参构造器时,其源码为:
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
head和tail指针会指向一个item域为null的节点,此时ConcurrentLinkedQueue状态如下图所示:
如图,head和tail指向同一个节点Node0,该节点item域为null,next域为null。
操作Node的几个CAS操作
在队列进行出队入队的时候免不了对节点需要进行操作,在多线程就很容易出现线程安全的问题。
可以看出在处理器指令集能够支持CMPXCHG指令后,在java源码中涉及到并发处理都会使用CAS操作,那么在ConcurrentLinkedQueue对Node的CAS操作有上面提到的这几个:
//更改Node中的数据域item
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
//更改Node中的指针域next
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
//更改Node中的指针域next
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
可以看出这些方法实际上是通过调用UNSAFE实例的方法,UNSAFE为sun.misc.Unsafe类,该类是hotspot底层方法,目前为止了解即可,知道CAS的操作归根结底是由该类提供就好。
节点类型说明
有效节点:从 head 向后遍历可达的节点当中,item 域不为 null 的节点。
无效节点:从 head 向后遍历可达的节点当中,item 域为 null 的节点。
已删除节点:从 head 向后遍历不可达的节点。
哨兵节点:链接到自身的节点(哨兵节点同时也是以删除节点)。
头节点:队列中的第一个有效节点(如果有的话)。
尾节点:队列中 next 域为 null 的节点(可以是无效节点)。
在后面的源代码分析中,我们将会看到队列有时会处于不一致状态。为此,ConcurrentLinkedQueue 使用三个不变式 ( 基本不变式,head 的不变式和 tail 的不变式 ),来约束队列中方法的执行。通过这三个不变式来维护非阻塞算法的正确性
基本不变式
在执行方法之前和之后,队列必须要保持的不变式:
- 当入队插入新节点之后,队列中有一个 next 域为 null 的(最后)节点。
- 从 head 开始遍历队列,可以访问所有 item 域不为 null 的节点。
head 的不变式和可变式
在执行方法之前和之后,head 必须保持的不变式:
- 所有“活着”的节点(指未删除节点),都能从 head 通过调用 succ() 方法遍历可达。
- head 不能为 null。
- head 节点的 next 域不能引用到自身。
在执行方法之前和之后,head 的可变式: - head 节点的 item 域可能为 null,也可能不为 null。
- 允许 tail 滞后(lag behind)于 head,也就是说:从 head 开始遍历队列,不一定能到达 tail。
tail 的不变式和可变式
在执行方法之前和之后,tail 必须保持的不变式:
- 通过 tail 调用 succ() 方法,最后节点总是可达的。
- tail 不能为 null。
在执行方法之前和之后,tail 的可变式: - tail 节点的 item 域可能为 null,也可能不为 null。
- 允许 tail 滞后于 head,也就是说:从 head 开始遍历队列,不一定能到达 tail。
- tail 节点的 next 域可以引用到自身。
入队列过程
入队列就是将入队节点添加到队列的尾部,在 ConcurrentLinkedQueue 中,插入新节点时,不用考虑尾节点是否为有效节点,直接把新节点插入到尾节点的后面即可。由于 tail 可以指向任意节点,所以入队时必须先通过 tail 找到尾节点,然后才能执行插入操作。如果插入不成功(说明其他线程已经抢先插入了一个新的节点)就继续向后推进。重复上述迭代过程,直到插入成功为止。
/**
* Inserts the specified element at the tail of this queue.
* 在队列后插入一个元素
* As the queue is unbounded, this method will never return {@code false}.
* 因为队列是无限大的,这个方法永远不会回false
*
* @return {@code true} (as specified by {@link Queue#offer})
* 返回true,
* @throws NullPointerException if the specified element is null
* 如果某元素为空则抛出空指针异常
*/
public boolean offer(E e) {
checkNotNull(e);//检查e是否为空
final Node<E> newNode = new Node<E>(e);//创建新节点
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) { //1
// p is last node. p是最后一个节点
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
//每两次,更新一下tail
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next CAS竞争失败,再次尝试。
}
else if (p == q)//2
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
//遇到哨兵节点,从head开始遍历。
//但如果tail被修改,则使用tail(因为可能被修改正确了)
p = (t != (t = tail)) ? t : head;
else //3
// Check for tail updates after two hops.取下一个节点或者最后一个节点
p = (p != t && t != (t = tail)) ? t : q;
}
}
这个方法没有任何锁操作。线程安全完全由CAS操作和队列的算法来保证。整个算法的核心是for循环,这个循环没有出口,知道尝试成功,这也符合CAS操作的流程。
当第一次加入元素的时候,由于队列为空,因此p.next为null。程序进入1处,程序将p的next节点赋值为newNode,也就是将新的元素加入到队列中。此时p==t成立,也就是不会执行casTail()代码更新tail末尾。如果casNext()成功,程序直接返回,如果失败,则再进行一次循环尝试,直到成功。因此,增加一个元素后,tail并不会被更新。
当程序尝试加入第二个元素时,由于此时t还在head的位置上,因此p.next指向实际的第一个元素,因此1处的q!=null,这表示q不是最后的节点。由于往队列中增加元素需要最后一个节点,因此,循环开始中查找最后一个节点。于是,程序进入3处,获得最后一个节点。此时,p实际上是指向链表中的第一个元素,它的next是null,故在第二个循环的时候,进入1处,p更新自己的next,让它指向新加入的节点。如果成功,由于此时p!=t成功,则会更新t所在位置,将t移到链表最后。
2处处理了p==q的情况。这种情况是由于遇到了哨兵节点导致的。所谓哨兵节点,就是next指向自己的节点。这种节点在队列中的存在价值不大,主要表示要删除的节点或者空节点。当遇到哨兵节点的时候,由于无法通过next取得后续的节点,因此很可能直接返回head,期望通过从链表头部开始遍历,进一步查找到链表末尾。但一旦发生在执行过程中,tail被其他线程修改的情况,则进行一次“打赌”,使用新的tail作为链表末尾,这样就避免了重新查找tail的开销。
- 添加元素1。队列更新head节点的next节点为元素1节点。又因为tail节点默认情况下等于head节点,所以它们的next节点都指向元素1节点。
- 添加元素2。队列首先设置元素1节点的next节点为元素2节点,然后更新tail节点指向元素2节点。
- 添加元素3,设置tail节点的next节点为元素3节点。
- 添加元素4,设置元素3的next节点为元素4节点,然后将tail节点指向元素4节点。
入队主要做两件事情,第一是将入队节点设置成当前队列尾节点的下一个节点。第二是更新tail节点,在入队列前如果tail节点的next节点不为空,则将入队节点设置成tail节点,如果tail节点的next节点为空,则将入队节点设置成tail的next节点,所以tail节点不总是尾节点。
上面的分析从单线程入队的角度来理解入队过程,但是多个线程同时进行入队情况就变得更加复杂,因为可能会出现其他线程插队的情况。如果有一个线程正在入队,那么它必须先获取尾节点,然后设置尾节点的下一个节点为入队节点,但这时可能有另外一个线程插队了,那么队列的尾节点就会发生变化,这时当前线程要暂停入队操作,然后重新获取尾节点。让我们再看看源码来详细分析下它是如何使用CAS方式来入队的(JDK1.8):
public boolean offer(E e) {
checkNotNull(e);
//创建入队节点
final Node<E> newNode = new Node<E>(e);
//t为tail节点,p为尾节点,默认相等,采用失败即重试的方式,直到入队成功
for (Node<E> t = tail, p = t;;) {
//获得p的下一个节点
Node<E> q = p.next;
// 如果下一个节点是null,也就是p节点就是尾节点
if (q == null) {
//将入队节点newNode设置为当前队列尾节点p的next节点
if (p.casNext(null, newNode)) {
//判断tail节点是不是尾节点,也可以理解为如果插入结点后tail节点和p节点距离达到两个结点
if (p != t)
//如果tail不是尾节点则将入队节点设置为tail。
// 如果失败了,那么说明有其他线程已经把tail移动过
casTail(t, newNode);
return true;
}
}
// 如果p节点等于p的next节点,则说明p节点和q节点都为空,表示队列刚初始化,所以返回 head节点
else if (p == q)
p = (t != (t = tail)) ? t : head;
else
//p有next节点,表示p的next节点是尾节点,则需要重新更新p后将它指向next节点
p = (p != t && t != (t = tail)) ? t : q;
}
}
从源代码我们看出入队过程中主要做了三件事情,第一是定位出尾节点;第二个是使用CAS指令将入队节点设置成尾节点的next节点,如果不成功则重试;第三是重新定位tail节点。
从第一个if判断就来判定p有没有next节点如果没有则p是尾节点则将入队节点设置为p的next节点,同时如果tail节点不是尾节点则将入队节点设置为tail节点。如果p有next节点则p的next节点是尾节点,需要重新更新p后将它指向next节点。还有一种情况p等于p的next节点说明p节点和p的next节点都为空,表示这个队列刚初始化,正准备添加数据,所以需要返回head节点。
出队列
出队列的就是从队列里返回一个节点元素,并清空该节点对元素的引用。让我们通过每个节点出队的快照来观察下head节点的变化。
从上图可知,并不是每次出队时都更新head节点,当head节点里有元素时,直接弹出head节点里的元素,而不会更新head节点。只有当head节点里没有元素时,出队操作才会更新head节点。让我们再通过源码来深入分析下出队过程(JDK1.8):
public E poll() {
// 设置起始点
restartFromHead:
for (;;) {
//p表示head结点,需要出队的节点
for (Node<E> h = head, p = h, q;;) {
//获取p节点的元素
E item = p.item;
//如果p节点的元素不为空,使用CAS设置p节点引用的元素为null
if (item != null && p.casItem(item, null)) {
if (p != h) // hop two nodes at a time
//如果p节点不是head节点则更新head节点,也可以理解为删除该结点后检查head是否与头结点相差两个结点,如果是则更新head节点
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
//如果p节点的下一个节点为null,则说明这个队列为空,更新head结点
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
//结点出队失败,重新跳到restartFromHead来进行出队
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
首先获取head节点的元素,并判断head节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走,如果不为空,则使用CAS的方式将head节点的引用设置成null,如果CAS成功,则直接返回head节点的元素,如果CAS不成功,表示另外一个线程已经进行了一次出队操作更新了head节点,导致元素发生了变化,需要重新获取head节点。如果p节点的下一个节点为null,则说明这个队列为空(此时队列没有元素,只有一个伪结点p),则更新head节点。
队列判空
有些人在判断队列是否为空时喜欢用queue.size()==0,让我们来看看size方法:
public int size()
{
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
}
可以看到这样在队列在结点较多时会依次遍历所有结点,这样的性能会有较大影响,因而可以考虑empty函数,它只要判断第一个结点(注意不一定是head指向的结点)。
public boolean isEmpty() {
return first() == null;
}
HOPS的设计
通过上面对offer和poll方法的分析,我们发现tail和head是延迟更新的,两者更新触发时机为:
tail更新触发时机:当tail指向的节点的下一个节点不为null的时候,会执行定位队列真正的队尾节点的操作,找到队尾节点后完成插入之后才会通过casTail进行tail更新;当tail指向的节点的下一个节点为null的时候,只插入节点不更新tail。
head更新触发时机当head指向的节点的item域为null的时候,会执行定位队列真正的队头节点的操作,找到队头节点后完成删除之后才会通过updateHead进行head更新;当head指向的节点的item域不为null的时候,只删除节点不更新head。
并且在更新操作时,源码中会有注释为:hop two nodes at a time。所以这种延迟更新的策略就被叫做HOPS的大概原因是这个(猜的 :)),从上面更新时的状态图可以看出,head和tail的更新是“跳着的”即中间总是间隔了一个。那么这样设计的意图是什么呢?
如果让tail永远作为队列的队尾节点,实现的代码量会更少,而且逻辑更易懂。但是,这样做有一个缺点,如果大量的入队操作,每次都要执行CAS进行tail的更新,汇总起来对性能也会是大大的损耗。如果能减少CAS更新的操作,无疑可以大大提升入队的操作效率,所以doug lea大师每间隔1次(tail和队尾节点的距离为1)进行才利用CAS更新tail。对head的更新也是同样的道理,虽然,这样设计会多出在循环中定位队尾节点,但总体来说读的操作效率要远远高于写的性能,因此,多出来的在循环中定位尾节点的操作的性能损耗相对而言是很小的。