ConcurrentLinkedQueue是一个基于链表结构的无界队列,提供了Queue的基本特性FIFO,出入规则是:从head出,从tail进。非阻塞特性使其在高并发环境依然能有出色的性能。
ConcurrentLinkedQueue的基础数据结构:
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
。。。
private static class Node<E> {
volatile E item;
volatile Node<E> next;
。。。
}
下面是ConcurrentLinkedQueue的一些需要注意的点:
1.不允许存储null值,否则抛出NPE
2.Iterator遍历时数据的弱一致性
3.size方法没有绝对的实时性
4.没有fast-fail机制,不会抛出ConcurrentModificationException
5.head,tail节点的延迟更新处理
6.无锁化设计的ABA问题
1.不允许存储null值,否则抛出NPE
每次offer操作时都需要进行checkNotNull操作,若item为null,直接跑NPE.
private static void checkNotNull(Object v) {
if (v == null)
throw new NullPointerException();
}
2.Iterator遍历时数据的弱一致性 3.size方法没有绝对的实时性 4.没有fast-fail机制
前两个问题都是由于ConcurrentLinkedQueue的异步特性造成的,在遍历时无法保证队列不会被其他线程修改.并且ConcurrentLinkedQueue没有fast-fail机制,即在遍历队列时,队列被别的线程修改并不会抛出ConcurrentModificationException。所以当前线程对于其他线程做的修改不能及时感知到。除了Iterator遍历/size()方法,其他所有批量操作/全局操作的方法都存在这个问题(如:toArray/addAll等)
5.head,tail节点的延迟更新处理
这点是最初接触ConcurrentLinkedQueue时比较困惑的地方,head和tail节点并不是实时更新的,也就是说每次offer/poll操作并不一定改变head/tail的值,这种处理方式可以减少入队出队时的cas操作,对性能是个很大的提升。下面结合offer和poll操作来具体分析下:
执行操作:初始化
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();
队列状态
执行操作:queue.offer("aaa");
队列状态
执行操作:queue.offer("bbb");
队列状态
执行操作:queue.offer("ccc");
队列状态
执行操作:queue.offer("ddd");
队列状态
下面根据offer源码具体分析:
offer操作源码:
public boolean offer(E e) {
checkNotNull(e);//NPE检查
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
(1) if (q == null) {
// p是最后一个节点,cas更新
if (p.casNext(null, newNode)) {
//cas更新成功
if (p != t)
casTail(t, newNode); //cas更新tail
return true;
}
// cas操作失败,重复操作
}
(2) else if (p == q)
p = (t != (t = tail)) ? t : head;
(3) else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
offer操作可以分为两个步骤:1.确定尾节点 2. 做cas更新
从上面的状态图中可以看到,tail指向的节点并不一定是队列的尾节点,所以要做元素入队操作的第一要务是确定尾节点.
重点分析下为什么q 会出现以上三种情况:
- q == null
当前的队列状态1
执行操作:queue.offer("ccc");
当tail指向队列的最后一个节点bbb的时候,tail.next为null,此时q 为 null,执行cas更新节点的操作,p.casNext(null, newNode),将p的next值设置为newNode,等同于p.next = newNode;
队列的状态2如下:
问题:为什么需要做下面这个判断?这个cas操作失败了会有什么后果?
if (p != t)
casTail(t, newNode);
下面继续分析
在上面队列状态基础上执行操作:queue.offer("ddd");
此时 p = tail,q = p.next; p指向bbb,q指向ccc。状态如下:
这种情况下 q != null 且 p != q,所以执行(3)操作:
p = (p != t && t != (t = tail)) ? t : q;
这步操作后 p 指向了 节点ccc。即p指向了尾节点。尾节点确定了,那么下次循环即可做cas更新操作了。
得到最新状态3:
在队列状态2【即执行queue.offer("ccc")后】的基础上做如下操作:
在并发环境中,当线程A执行操作queue.offer("ddd");之前,同时另一个线程执行完以下三次poll操作:
queue.poll();
queue.poll();
queue.poll();
此时线程A看到的队列的状态如下:
所以当 p == q时,表示当前节点已经不再链表中(已被移除)。所以这种情况下需要将p重新指向head或tail。
p = (t != (t = tail)) ? t : head;
至此,offer操作中的三种情况分析完毕。
下面分析poll操作。
当前状态
执行操作:queue.poll();
状态如下:
继续执行操作:queue.poll();
状态如下:
继续执行操作:queue.poll(); 状态如下
可见head的更新和tail类似,都是采用了延迟更新的优化方式。
poll源码如下:
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) {
// 成功将item置为null
if (p != h) // 更新head
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
poll操作和offer类似,先获取head,然后cas(依情况决定是否更新head),一般hop为2。
6.无锁化设计的ABA问题
在用cas做lock-free操作时有一个经典的ABA问题。那么ConcurrentLinkedQueue需要考虑这个问题吗?
ConcurrentLinkedQueue中不存在ABA问题,这主要依赖于Java语言的垃圾回收机制。当一个节点被poll或remove后,即被gc,该节点会被垃圾回收器回收。
使用场景
- Tomcat存储等待请求
protected ConcurrentLinkedQueue<SocketWrapper<Socket>> waitingRequests =
new ConcurrentLinkedQueue<SocketWrapper<Socket>>();
ConcurrentLinkedDeque
JDK还提供了一个双端队列的实现版本。并发操作特性和ConcurrentLinkedQueue相似,提供了Deque接口特征。
参考: jdk1.7 java.util.concurrent.ConcurrentLinkedQueue源码