[Java] Java 并发包中并发原理剖析之ConcurrentLinkedQueue


 * Creates a {@code ConcurrentLinkedQueue} that is initially empty.
public ConcurrentLinkedQueue() {
  head = tail = new Node<E>(null);


private static class Node<E> {
    volatile E item;
    volatile Node<E> next;

     * Constructs a new node.  Uses relaxed write because item can
     * only be seen after publication via casNext.
    Node(E item) {
        UNSAFE.putObject(this, itemOffset, item);

    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);

    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);

    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);

    // Unsafe mechanics

    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset
            nextOffset = UNSAFE.objectFieldOffset
        } catch (Exception e) {
            throw new Error(e);

注意,像这个包中大多数的非阻塞算法一样,这个实现依赖于,在垃圾收集系统,没有ABA的可能性问题的事实。由于回收节点,因此没有必要使用“数指针”或在版本中使用“ non-GC”设置的相关技术。


  • 假如正好有一个(最后一个)引用为空的节点next,在排队时使用CAS算法。最后一个节点可以在O(1)时间内从tail到达,但tail只是一个优化——它也总是可以在O(N)时间内从head到达。
  • 队列中包含的元素是从head访问的非空节点。通过CAS将节点的引用指向null,自动的会将它从队列中移除。来自head的所有元素的可达性必须保持true,即使在导致head前进的并发修改的情况下也是如此。由于创建了一个迭代器,或者是一个丢失了时间片的poll()操作,一个离开队列的节点可能会无限期地继续使用。

一个基于链接节点的无界线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部 是队列中时间最长的元素。队列的尾部 是队列中时间最短的元素。新的元素插入到队列的尾部,队列获取操作从队列头部获得元素。当多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许使用 null 元素。

此实现采用了有效的“无等待 (wait-free)”算法,该算法基于 Maged M. Michael 和 Michael L. Scott 合著的 Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms 中描述的算法。适用于垃圾收集环境,支持内部节点删除(以支持删除(对象))。

需要小心的是,与大多数 collection 不同,size 方法不是 一个固定时间操作。由于这些队列的异步特性,确定当前元素的数量需要遍历这些元素。

此类及其迭代器实现了 CollectionIterator 接口的所有可选 方法。

内存一致性效果:当存在其他并发 collection 时,将对象放入 ConcurrentLinkedQueue 之前的线程中的操作 happen-before 随后通过另一线程从 ConcurrentLinkedQueue 访问或移除该元素的操作。

此类是 Java Collections Framework 的成员。


返回值 方法
boolean add(E e) 将指定元素插入此队列的尾部。
boolean contains(Object o) 如果此队列包含指定元素,则返回 true
boolean isEmpty() 如果此队列不包含任何元素,则返回 true
Iterator<E> iterator() 返回在此队列元素上以恰当顺序进行迭代的迭代器。
boolean offer(E e) 将指定元素插入此队列的尾部。
E peek() 获取但不移除此队列的头;如果此队列为空,则返回 null
E poll() 获取并移除此队列的头,如果此队列为空,则返回 null
boolean remove(Object o) 从队列中移除指定元素的单个实例(如果存在)。
int size() 返回此队列中的元素数量。
Object[] toArray() 返回以恰当顺序包含此队列所有元素的数组。
<T> T[] toArray(T[] a) 返回以恰当顺序包含此队列所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。

ConcurrentLinkedQueue 原理介绍


     * Inserts the specified element at the tail of this queue.
     * As the queue is unbounded, this method will never return {@code false}.
     * @return {@code true} (as specified by {@link Queue#offer})
     * @throws NullPointerException if the specified element is null
    public boolean offer(E e) {
        final Node<E> newNode = new Node<E>(e);
        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p is last node
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    //[6] CAS成功,则说明新增节点已经被放入链表,然后设置当前尾节点(包含head,第
                                        //1, 3 , 5 . . .个节点为尾节点)
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                // Lost CAS race to another thread; re-read next
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                                //成了 head,所以这里需要重新找新的head
                p = (t != (t = tail)) ? t : head;
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;

offer 操作时在队列末尾添加一个元素,如果该元素为null,则抛出NPE异常,否则由于ConcurrentLinkedQueue是无界队列,该方法一直会返回true。由于使用CAS无阻塞算法,因此该方法不会阻塞挂起调用的线程。



 * Inserts the specified element at the tail of this queue.
 * As the queue is unbounded, this method will never throw
 * {@link IllegalStateException} or return {@code false}.
 * @return {@code true} (as specified by {@link Collection#add})
 * @throws NullPointerException if the specified element is null
public boolean add(E e) {
    return offer(e);



public E poll() {
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;
            if (item != null && p.casItem(item, null)) {
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            else if (p == q)
                continue restartFromHead;
                p = q;



public E peek() {
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;
            if (item != null || (q = p.next) == null) {
                updateHead(h, p);
                return item;
            else if (p == q)
                continue restartFromHead;
                p = q;




 * Returns the number of elements in this queue.  If this queue
 * contains more than {@code Integer.MAX_VALUE} elements, returns
 * {@code Integer.MAX_VALUE}.
 * <p>Beware that, unlike in most collections, this method is
 * <em>NOT</em> a constant-time operation. Because of the
 * asynchronous nature of these queues, determining the current
 * number of elements requires an O(n) traversal.
 * Additionally, if elements are added or removed during execution
 * of this method, the returned result may be inaccurate.  Thus,
 * this method is typically not very useful in concurrent
 * applications.
 * @return the number of elements in this queue
public int size() {
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p))
        if (p.item != null)
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE)
    return count;

 * Returns the first live (non-deleted) node on list, or null if none.
 * This is yet another variant of poll/peek; here returning the
 * first node, not element.  We could make peek() a wrapper around
 * first(), but that would cost an extra volatile read of item,
 * and the need to add a retry loop to deal with the possibility
 * of losing a race to a concurrent poll().
Node<E> first() {
  for (;;) {
    for (Node<E> h = head, p = h, q;;) {
      boolean hasItem = (p.item != null);
      if (hasItem || (q = p.next) == null) {
        updateHead(h, p);
        return hasItem ? p : null;
      else if (p == q)
        continue restartFromHead;
        p = q;

* Returns the successor of p, or the head node if p.next has been
* linked to self, which will only be true if traversing with a
* stale pointer that is now off the list.
final Node<E> succ(Node<E> p) {
Node<E> next = p.next;
return (p == next) ? head : next;





 * Removes a single instance of the specified element from this queue,
 * if it is present.  More formally, removes an element {@code e} such
 * that {@code o.equals(e)}, if this queue contains one or more such
 * elements.
 * Returns {@code true} if this queue contained the specified element
 * (or equivalently, if this queue changed as a result of the call).
 * @param o element to be removed from this queue, if present
 * @return {@code true} if this queue changed as a result of the call
public boolean remove(Object o) {
    if (o != null) {
        Node<E> next, pred = null;
        for (Node<E> p = first(); p != null; pred = p, p = next) {
            boolean removed = false;
            E item = p.item;
            if (item != null) {
                if (!o.equals(item)) {
                    next = succ(p);
                removed = p.casItem(item, null);
            next = succ(p);
            if (pred != null && next != null) // unlink
                pred.casNext(p, next);
            if (removed)
                return true;
    return false;



 * Returns {@code true} if this queue contains the specified element.
 * More formally, returns {@code true} if and only if this queue contains
 * at least one element {@code e} such that {@code o.equals(e)}.
 * @param o object to be checked for containment in this queue
 * @return {@code true} if this queue contains the specified element
public boolean contains(Object o) {
    if (o == null) return false;
    for (Node<E> p = first(); p != null; p = succ(p)) {
        E item = p.item;
        if (item != null && o.equals(item))
            return true;
    return false;


  • ConcurrentLinkedQueue的底层使用单向链表数据结构来保存队列元素,每个元素被包装成一个 Node 节点。队列是靠头、尾节点来维护的,创建队列时头、尾节点指向-个 item 为 null 的哨兵节点。第一次执行 peek 或者first 操作时会把 head 指向第一个真正的队 列元素。由于使用非阻塞 CAS 算法,没有加锁,所以在计算 size 时有可能进行了 offer 、poll 或者 remove 操作 , 导致计算的元素个数不精确,所以在井发情况下 size 函数不是很 有用。

  • 入队、出队都是操作使用 volatile 修饰的 tail 、 head 节点,要保证在多线程下出入队线程安全,只需要保证这两个 Node 操作的可见性和原子性即可。由于 volatile 本身可以保证可见性,所以只需要保证对两个变量操作的原子性即可。

  • offer 操作是在 tail 后面添加元素,也就是调用 tail.casNext 方法,而这个方法使用的是CAS 操作,只有一个线程会成功,然后失败的线程会循环,重新获取 tail , 再执行 casNext 方法。 poll 操作也通过类似 CAS 的算法保证出队时移除节点操作的原子性。






