简介
ConcurrentLinkedQueue 是线程安全的非阻塞队列,内部是单向链表。ConcurrentLinkedQueue 使用CAS机制保证线程安全,而LinkedBlockingQueue使用两个lock保证线程安全,ConcurrentLinkedQueue 性能比LinkedBlockingQueue高很多。需要注意的是ConcurrentLinkedQueue 的头节点和尾节点都具有滞后性,直接读取不应当准确,不能直接使用,一般需要遍历。
ConcurrentLinkedQueue 类
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable
继承AbstractQueue,并实现Queue接口
重要内部类 Node
private static class Node<E>
Node 属性
// 元素
volatile E item;
// 下一个元素
volatile Node<E> next;
// 内存操作,不安全类
private static final sun.misc.Unsafe UNSAFE;
// 内存操作当前元素偏移量
private static final long itemOffset;
// 内存操作下个元素偏移量
private static final long nextOffset;
Node 初始化UNSAFE
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
UNSAFE 只能通过反射进行初始化
Node 构造函数
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
Node 方法
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
Node 中用到了Java底层不安全类sun.misc.Unsafe,它可以像C++中一样直接操作内存,平时开发中没把握时一定不要用。
ConcurrentLinkedQueue 属性
// 头节点
private transient volatile Node<E> head;
// 尾节点
private transient volatile Node<E> tail;
// 同上
private static final sun.misc.Unsafe UNSAFE;
private static final long headOffset;
private static final long tailOffset;
ConcurrentLinkedQueue 构造函数
默认初始化
public ConcurrentLinkedQueue() {
// 初始化头节点和尾节点(null节点)
head = tail = new Node<E>(null);
}
使用线性集合初始化
public ConcurrentLinkedQueue(Collection<? extends E> c) {
Node<E> h = null, t = null;
// 遍历线性集合
for (E e : c) {
// 为空抛异常
checkNotNull(e);
// 构建节点
Node<E> newNode = new Node<E>(e);
// 设置头节点
if (h == null)
h = t = newNode;
else {
// 设置下级节点和当前节点
t.lazySetNext(newNode);
t = newNode;
}
}
// 头节点为空构建空节点
if (h == null)
h = t = new Node<E>(null);
// 头节点和尾节点赋值
head = h;
tail = t;
}
ConcurrentLinkedQueue 基础方法
修改头节点
final void updateHead(Node<E> h, Node<E> p) {
// 头节点修改为p
if (h != p && casHead(h, p))
// 原头节点的下级节点修改为自己
h.lazySetNext(h);
}
刚开始忘记看这个方法,到账后面p==q完全看不懂 [捂脸]
获取下级节点
final Node<E> succ(Node<E> p) {
// 获取p下级节点
Node<E> next = p.next;
// 如果p为原来头节点(updateHead),p下级指向新head
return (p == next) ? head : next;
}
在看这个方法之前一定要先updateHead,否则p==next让你欲仙欲死
从链表中获取头节点
Node<E> first() {
restartFromHead:
for (;;) {
// 尝试从head开始遍历
for (Node<E> h = head, p = h, q;;) {
// p.item 是否为空
boolean hasItem = (p.item != null);
// 元素为空或者p.next为空
if (hasItem || (q = p.next) == null) {
// 修改头节点
updateHead(h, p);
// 头节点元素不为null返回p
return hasItem ? p : null;
}
// 刚好取到的head是原来的头节点重新查找
else if (p == q)
continue restartFromHead;
else
// p 向后移
p = q;
}
}
}
if (hasItem || (q = p.next) == null) 只要节点元素不为空就可以进去,元素为空下级节点不为空,后走到最后的else
队列是否为空
public boolean isEmpty() {
return first() == null;
}
转数组
public Object[] toArray() {
// 创建ArrayList
ArrayList<E> al = new ArrayList<E>();
// 遍历所有元素
for (Node<E> p = first(); p != null; p = succ(p)) {
E item = p.item;
if (item != null)
// 向list中添加元素
al.add(item);
}
// list转数组
return al.toArray();
}
ConcurrentLinkedQueue 添加
public boolean add(E e) {
return offer(e);
}
public boolean offer(E e) {
// 检查是否为空
checkNotNull(e);
// 构建新对象
final Node<E> newNode = new Node<E>(e);
// 从尾节点向后遍历
for (Node<E> t = tail, p = t;;) {
// 获取尾节点的下一个节点
Node<E> q = p.next;
if (q == null) {// p就是尾节点
// next为空就设置为新节点
if (p.casNext(null, newNode)) {// 成功
// 首次添加不指定新的尾节点
if (p != t)
// 尾节点为t,就把尾节点设置为新节点
casTail(t, newNode);
return true;
}
}
// 头节点移除后调用updateHead方法
// updateHead会把原来头节点下级指向自己
else if (p == q)
// 从头走
p = (t != (t = tail)) ? t : head;
else
// 推动p节点后移
p = (p != t && t != (t = tail)) ? t : q;
}
}
ConcurrentLinkedQueue 出队
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
// 获取头节点值
E item = p.item;
// 头节点不为null,先把头节点置为null
if (item != null && p.casItem(item, null)) {
// 第一次操作成功p=h,后面线程修改头节点
if (p != h)
// 修改头节点
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 没获取到锁或者为空
else if ((q = p.next) == null) {
// 下级节点为空修改头节点
updateHead(h, p);
return null;
}
// 头节点移除后调用updateHead方法
// updateHead会把原来头节点下级指向自己
else if (p == q)
continue restartFromHead;
else
// p指向下级节点
p = q;
}
}
}
ConcurrentLinkedQueue 查询
public E peek() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
// 判断头节点元素是否为空
if (item != null || (q = p.next) == null) {
// 修改头节点
updateHead(h, p);
return item;
}
else if (p == q)
// 从头再来
continue restartFromHead;
else
// 向下找
p = q;
}
}
}