ConcurrentLinkedQueue是一个lock-free的非阻塞式线程安全的同步队列,其中freelock算法是值得让人思考和深究的;
Michael & Scott算法
因为ConcurrentLinkedQueue
是在Michael & Scott算法(论文)的基础上做了一些修改的,所以先可以了解下该算法的原理;
背景
并发FIFO队列广泛应用于并行应用程序和操作系统中。为了确保数据的正确性,必须同步对共享队列的并发访问。通常,并发数据结构(包括FIFO队列)的算法分为两类:阻塞和非阻塞;
阻塞算法允许一个缓慢或延迟的进程,以防止更快的进程无限期地对共享数据结构的进行操作。
非阻塞算法保证,如果有一个或多个活动线程试图在共享数据结构上执行操作,某些操作将在有限的时间和步长内完成。
在异步(尤其是多进程)多处理器系统上,当进程在不适当的时候停止或延迟时,阻塞算法会导致显著的性能下降。延迟的可能来源包括处理器调度抢占、页面错误和缓存丢失。面对这些事件,非阻塞算法更加健壮。
原理
该算法是一个FIFO的、带有一个head和tail节点单链表队列结构,Head总是指向一个虚拟节点,它是列表中的第一个节点;Tail指向列表中的最后一个节点或倒数第二个节点;因为是lock-free,使用CAS代替原本的加锁逻辑,同时采用修改程序计数器的方式来避免CAS造成的ABA问题(并不是CAS的语义造成的ABA问题,而是伴随着CAS操作的上下文可能会存在ABA问题);
该算法可以保证安全性,因为满足以下一些性质:
- 链表是连续的;
- 新节点只在链表的最后一个节点之后插入,出队时只在链表的头节点开始移除;
- Head始终指向链表中的第一个节点,tail总是指向链表的某一个节点;
有了上面几个特性,就可以推导出以下几个结论
链表总是连接的,因为一旦插入了一个节点,它的下一个指针在释放之前不会被设置为NULL,并且一直不会释放该节点直到从队头删除。
在无锁定算法,节点只会被插入到链表的末尾,因为他们是通过tail指针链接,tail总是指向链表中的某一个节点,一个新节点只会被添加到某个next域为null的节点,通常这个节点是链表的最后一个;
节点从列表的头部被删除,因为它们只有在被Head指向时才会被删除,Head总是指向列表中的第一个节点。
Head始终指向列表中的第一个节点,并且它会自动地将其值更改为下一个节点(使用Head锁或使用cas)。当这种情况发生时,它用来指向的节点被认为已从列表中删除。Head的新值不能为空,因为如果链表中有一个节点,dequeue操作将返回但不会删除任何节点。
Tail总是指向链表中的一个节点,并且它从不落后于Head,所以它永远不会指向被删除的节点。另外,当Tail改变它的值时,它总是移动到列表中的最后一个节点,如果下一个指针为空,则它被移动。
initialize(Q: pointer to queue_t)
// 初始化一个虚拟节点并将head和tail指向它
node = new_node()
node->next.ptr = NULL
Q->Head.ptr = Q->Tail.ptr = node
enqueue(Q: pointer to queue_t, value: data type)
// 创建即将入队的新节点
E1: node = new_node()
E2: node->value = value
E3: node->next.ptr = NULL
// 无限循环直到入队成功
E4: loop
E5: tail = Q->Tail
E6: next = tail.ptr->next
E7: if tail == Q->Tail
E8: if next.ptr == NULL
// 将新节点入队
E9: if CAS(&tail.ptr->next, next, <node, next.count+1>)
E10: break // 完成入队,退出循环
E11: endif
E12: else // tail并没有指向最后一个节点
// 重新将tail节点指向最后一个节点,重新循环
E13: CAS(&Q->Tail, tail, <next.ptr, tail.count+1>)
E14: endif
E15: endif
E16: endloop
// 入队成功,试着将tail指向到新的插入节点上,可以允许失败
E17: CAS(&Q->Tail, tail, <node, tail.count+1>)
dequeue(Q: pointer to queue_t, pvalue: pointer to data type): boolean
D1: loop // 循环直到有节点出队
D2: head = Q->Head
D3: tail = Q->Tail
D4: next = head.ptr->next
D5: if head == Q->Head
D6: if head.ptr == tail.ptr
// 队列为空或者tail节点在head节点前面
D7: if next.ptr == NULL
D8: return FALSE // 队列是空的,返回
D9: endif
// head节点落后于tail节点,尝试将tail移到最后的位置
D10: CAS(&Q->Tail, tail, <next.ptr, tail.count+1>)
D11: else // 出队只需处理head节点,tail节点不管
// CAS之前读取值,因为其他线程可能会释放该节点
D12: *pvalue = next.ptr->value
// 将head节点指向已经出队的节点,将其作为新的哨兵节点
D13: if CAS(&Q->Head, head, <next.ptr, head.count+1>)
D14: break // 出队完成,退出循环
D15: endif
D16: endif
D17: endif
D18: endloop
D19: free(head.ptr) // 释放head节点的ptr,置为null
D20: return TRUE
有了这些规则定义后,可以来分析算法的源码了;
入队
只有当E7、E8中的条件失败或E9中的CAS失败时,队列操作才会循环;
在执行了E5之后,如果当前线程失去了CPU的执行权(比如时间片到了),其他线程修改了tail节点,那么E7行中的条件就会失败,因此如果E7行失败不止一次,那么一定存在另一个线程完成了入队操作;
如果Tail节点指向链表中倒数第二个节点,则E8行判断将失败,在E13行进行CAS后,tail必须指向列表中的最后一个节点,除非某个线程成功的插入了一个新的节点,因此第E8行的判断失败了不止一次,那么一定存在另一个线程完成了入队操作;
只有当另一个线程成功地将一个新节点加入队列时,第E9行中的CAS操作才会失败。出队
只有当D5行中的条件失败、D6行中的条件保持不变(且队列不是空的)或D13行中的CAS失败时,dequeue操作才会循环。
D5行中的条件和D13行中的cas只有在另一个线程修改Head时才会失败,只有当一个线程成功地使一个Node退出队列时,才会修改Head。
只有当Tail指向链表中倒数第二个节点时,D6行的条件才会成功(且队列不是空的)。
在D10行进行CAS之后,Tail必须指向列表中的最后一个节点,除非另一个线程成功的将一个新节点入队。因此,如果D6行的条件成功了不止一次,那么一定存在另一个线程完成了一个入队操作或者出队操作;
了解Michael & Scott算法之后,就可以对比ConcurrentLinkedQueue的异同;
特性
ConcurrentLinkedQueue
是在Michael&Scott算法的基础上做了一个修改,适用于GC的环境,由于不存在ABA的问题(因为就算是两个A,但是对于节点来说内存地址不一样,所以不存在ABA问题),所以去掉了引用计数;
基本不变的规则有以下几点:
- 只有一个(最后一个)节点具有
null
的next引用(即新节点永远都从最后一个next为null的节点上添加的),它在排队时被cas。最后一个节点可以在O(1)时间内从tail节点到达,但这仅仅是一个优化——它也总是可以在O(N)时间内从head到达同一个节点。 - 所有被队列中包含的非空元素是可以通过head的访问的,通过CAS自动地将item为null的节点从队列中删除。head到所有item不为null的可达性必须为true,即使在并发条件下的修改而导致head的移动的情况下也是如此;对于创建了一个迭代器,或者仅仅是一个丢失了时间片的poll(),离开队列的节点可能会被无限期地继续使用。
上面的规则可能暗示了从一个被删除的节点开始所有的节点都是gc可达的,这将导致两个问题:
- 允许一个迭代器造成无限的内存占用
- 如果一个节点是活的,则会导致旧节点与新节点的跨代连接,而分代gc很难处理这一问题,从而导致重复的集合问题
为了解决这个问题,代码中使用了一个技巧,将所有被移除的节点的next节点都指向了自己本身,这可以隐含的告诉GC节点已经是非可达的了,从而可以被GC回收;
同时Head
和Tail
节点都是允许滞后更新的,这是一个很重要的优化,可以减少更新头尾节点的竞争提高并发性,最多允许落后一个节点的步长,当有两个或以上的步长时会更新头尾节点;
debug的时候用IDEA时遇到debugger打印逻辑的错误,一开始被IDEA给误导了半天,很多代码解释不通,最后采用的办法是将代码copy出来放在一个新的类里面就可以输出正确的debugger结果了;
// head节点可以在O(1)时间内到达第一个未被删除的节点(如果有的话)的节点
// 不变性:
// - 链表中所有的节点都可以从head中访问到
// - head != null
// - (tmp = head).next != tmp || tmp != head;代表head的next指针不可能指向自己
// 可变性:
// - head.item可是null或非null
// - 允许tail落后于head,也就是说从head不可达到tail
private transient volatile Node<E> head;
// 列表中最后一个节点(也就是node.next == null的那个),可以从tail指针
// 以O(1)的时间到达
// 不变性:
// - 最后一个节点永远可以通过succ()从tail访问
// - tail != null
// 可变性:
// - tail.item可是null或非null
// - 允许tail落后于head,也就是说从head不可达到tail
// - tail.next 允许指向自己
private transient volatile Node<E> tail;
//
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
初始化的时候会创建一个哨兵节点,head和tail指向哨兵节点;
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) { // loop
Node<E> q = p.next;
if (q == null) {
// p是队列结尾
if (p.casNext(null, newNode)) { // CAS成功
// 完成入队
if (p != t)
// 如果tail距离尾节点的步长为2,更新tail
casTail(t, newNode); // 失败也是ok的,意味着有其他线程更新成功
return true;
}
}
else if (p == q)
p = (t != (t = tail)) ? t : head;
else
p = (p != t && t != (t = tail)) ? t : q;
}
}
先假设单线程环境下的入队操作,当tail节点指向队尾节点时,只需要通过一次CAS就可以入队成功;
可以看到,对比Michael&Scott算法的入队操作,在更新尾节点的频率上是更低频次的;
最多允许tail节点落后一个节点的步长;现在可以理解p = (p != t && t != (t = tail)) ? t : q;
这句话了;
// 走到这里代表p.next != null且p != p.next
p = (p != t && t != (t = tail)) ? t : q;
// 可以翻译成下面的代码
if (p == t) {
// 1
// 意味着tail节点没有被其他线程修改,同时tail节点指向的并不是队列最后一个节点
p = q;
} else {
// 2
// 并发情况下就有可能走到这里
// p.next != null && p != p.next && p != t
Node<E> tmp = t;
t = tail;
if (tmp != t) {
p = t; // tail被其他线程修改了,重新将p指向tail
} else {
p = q; // 指向next所指的节点
}
}
在通常情况下(比如上面的图2场景),tail节点并不是指向最后一个节点的时候,就有可能进入到1的场景中,这个还是比较好理解的;
就是场景2里面,只有在并发入队的条件下才会有可能走到2,比如第一次走到场景1中,将q赋值给p,这时候p就是指向了原本p.next
节点,但是在下一个p.casNext(null, newNode)
失败的情况下(意味着有一个其他的线程成功入队了),那么还是会在下一次loop中重新回到p = (p != t && t != (t = tail)) ? t : q;
,并且这一次就是重新走到场景2;
public E poll() {
restartFromHead:
for (;;) { // loop
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) {
// 如果p的item不为null且cas成功就算是成功出队
if (p != h) // 和入队一样,出队也是允许head不是即时更新的
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 走到这里意味着head的item为null,或者说其他线程出队成功
// 而导致的cas失败
else if ((q = p.next) == null) {
// 队列已经为空的了,更新head节点并返回null
updateHead(h, p);
return null;
}
else if (p == q)
// q = p.next && p == q
// 意味着p和q指向的节点被移出了队列
continue restartFromHead;
else
// 下一个元素不为null && p并不等于p.next
// p指向下一个出队的元素
p = q;
}
}
}
同样出队的时候也是滞后更新head节点,它只需要保证head节点一定可以通过succ()
访问到后继节点就行了;
final void updateHead(Node<E> h, Node<E> p) {
// 通过CAS将head节点的值设置为下一个不为null的节点
if (h != p && casHead(h, p))
// 同时将节点的next指向自己
h.lazySetNext(h);
}
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
可以看到并不是每一次出队都会更新head节点,只有在步长大于1的时候才会移动head指针,同时把之前head节点的next域指向自己,将节点和链表之间的连接断开;
UNSAFE.putOrderedObject
方法的作用是优化对volatile
写的开销,因为为了保证volatile
的可见性语义,在每个volatile写操作后插入StoreLoad
屏障,通常来说该屏障是需要昂贵的开销才能完成,而putOrderedObject
的是使用StoreStore
屏障,使用的结果就是写入数据可能并不会被其他线程马上看到,通常是几纳秒后被其他线程看到,所以是叫lazySetNext
;
最后再回到offer方法的这个判断情况
// q != null, q = p.next
else if (p == q)
// 如果p == q,那么意味着p的next指向的是自己
p = (t != (t = tail)) ? t : head;
// 可以翻译成下面的代码
Node<E> tmp = t;
t = tail;
if (tmp != t) {
// t不等于tail,意味着有新的节点入队,重新指向tail节点
p = t;
} else {
// 当前p指向的节点已经被其他线程移除队列,所以不能访问后续的节点
// 将p指向head节点重新回到队列上面
p = head;
}
如果p == q
这个判断条件成立,那么意味着p的next域指向的是自己,这种情况只会在上面的poll()
方法中的updateHead
把节点移出队列时才会把next域指向自己;所以说当前的p节点实际上已经脱离了队列,那么就需要将其重新指向队列中的某个节点;