12. SynchronousQueue





SynchronousQueue的内部实现了两个类,一个是TransferStack类,使用LIFO顺序存储元素,这个类用于非公平模式;还有一个类是TransferQueue,使用FIFI顺序存储元素,这个类用于公平模式。这两个类继承自"Nonblocking Concurrent Objects with Condition Synchronization"算法,此算法是由W. N. Scherer III 和 M. L. Scott提出的,关于此算法的理论内容在这个网站中:http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html。两个类的性能差不多,FIFO通常用于在竞争下支持更高的吞吐量,而LIFO在一般的应用中保证更高的线程局部性。

队列(或者栈)的节点在任何时间要么是"date"模式 —— 通过put操作提供的元素的模式,要么是"request"模式 —— 通过take操作取出元素的模式,要么为空。还有一个模式是"fulfill"模式,当队列有一个data节点时,请求从队列中获取一个元素就会构造一个"fulfill"模式的节点,反之亦然。这个类最有趣的特性在于任何操作都能够计算出现在队列头节点处于什么模式,然后根据它进行操作而无需使用锁。



SynchronousQueue中使用的队列和栈的算法和"Nonblocking Concurrent Objects with Condition Synchronization"算法相比是不同的版本,包括对取消的处理。主要的差别如下:

  1. 最初的算法使用了位标记指针,但是此类在结点中使用了模式位,这导致了很多深入的改变。
  2. SynchronousQueue必须阻塞线程,直到变为fulfilled模式。
  3. 支持取消操作,通过超时和中断方式,包括清除被取消的结点/线程,以避免无法进行垃圾回收和无用的内存消耗。




abstract static class Transferer<E> {
     * 执行put或者take操作/
     * 如果参数e非空,这个元素将被交给一个消费线程;如果为null,
     * 则请求返回一个被生产者提交的元素。
     * 如果返回的结果非空,那么元素被提交了或被接受了;如果为null,
     * 这个操作可能因为超时或者中断失败了。调用者可以通过检查
     * Thread.interrupted来区分到底是因为什么元素失败。
    abstract E transfer(E e, boolean timed, long nanos);


这个类继承自Scherer-Scott的 dual stack 算法,但不完全相同,它使用结点而不是位标记指针。

static final class TransferStack<E> extends Transferer<E> {

    /* Modes for SNodes, ORed together in node fields */
    /** 表示一个未满足的消费者 */
    static final int REQUEST    = 0;
    /** 表示一个未满足的生产者 */
    static final int DATA       = 1;
    /** Node is fulfilling another unfulfilled DATA or REQUEST */
    static final int FULFILLING = 2;

    static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }

    /** Node class for TransferStacks. */
    static final class SNode {
        volatile SNode next;        // 栈中的下一个结点
        volatile SNode match;       // 匹配此结点的结点
        volatile Thread waiter;     // 控制 park/unpark
        Object item;                // 数据
        int mode;

核心算法 transfer


  1. 如果头节点为空或者已经包含了相同模式的结点,那么尝试将结点
  2. 如果头节点是一个模式不同的结点,尝试将一个fulfilling结点加入到栈中,匹配相应的等待结点,然后一起从栈中弹出,并且返回匹配的元素。匹配和弹出操作可能无法进行,由于其他线程正在执行操作3
  3. 如果栈顶已经有了一个fulfilling结点,帮助它完成它的匹配和弹出操作,然后继续。
E transfer(E e, boolean timed, long nanos) {
     * 基础算法,循环尝试下面三种操作中的一个:
     * 1. 如果头节点为空或者已经包含了相同模式的结点,尝试将结点
     *    增加到栈中并且等待匹配。如果被取消,返回null
     * 2. 如果头节点是一个模式不同的结点,尝试将一个`fulfilling`结点加入
     *    到栈中,匹配相应的等待结点,然后一起从栈中弹出,
     *    并且返回匹配的元素。匹配和弹出操作可能无法进行,
     *    由于其他线程正在执行操作3
     * 3. 如果栈顶已经有了一个`fulfilling`结点,帮助它完成
     *    它的匹配和弹出操作,然后继续。

    SNode s = null; // constructed/reused as needed
    // 传入参数为null代表请求获取一个元素,否则表示插入元素
    int mode = (e == null) ? REQUEST : DATA;

    for (;;) {
        SNode h = head;
        // 如果头节点为空或者和当前模式相同
        if (h == null || h.mode == mode) {  // empty or same-mode
            // 设置超时时间为 0,立刻返回
            if (timed && nanos <= 0L) {     // can't wait
                if (h != null && h.isCancelled())
                    casHead(h, h.next);     // pop cancelled node
                    return null;
            // 构造一个结点并且设为头节点
            } else if (casHead(h, s = snode(s, e, h, mode))) {
                // 等待满足
                SNode m = awaitFulfill(s, timed, nanos);
                if (m == s) {               // wait was cancelled
                    return null;
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                return (E) ((mode == REQUEST) ? m.item : s.item);
        // 检查头节点是否为FULFILLIING
        } else if (!isFulfilling(h.mode)) { // try to fulfill
            if (h.isCancelled())            // already cancelled
                casHead(h, h.next);         // pop and retry
            // 更新头节点为自己
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                // 循环直到匹配成功
                for (;;) { // loop until matched or waiters disappear
                    SNode m = s.next;       // m is s's match
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    SNode mn = m.next;
                    if (m.tryMatch(s)) {
                        casHead(s, mn);     // pop both s and m
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // lost match
                        s.casNext(m, mn);   // help unlink
        // 帮助满足的结点匹配
        } else {                            // help a fulfiller
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                if (m.tryMatch(h))          // help match
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    h.casNext(m, mn);       // help unlink



SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    int spins = shouldSpin(s)
        : 0;
    for (;;) {
        if (w.isInterrupted())
        SNode m = s.match;
        if (m != null)
            return m;
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
        if (spins > 0) {
            spins = shouldSpin(s) ? (spins - 1) : 0;
        else if (s.waiter == null)
            s.waiter = w; // establish waiter so can park next iter
        else if (!timed)
        else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
            LockSupport.parkNanos(this, nanos);


如果先take再put时,插入线程则会构建一个模式为[11]的结点,而11 & FULFILLING != 0, 所以isFulfilling(h.mode)方法会返回true。



void clean(SNode s) {
    s.item = null;   // forget item
    s.waiter = null; // forget thread

     * At worst we may need to traverse entire stack to unlink
     * s. If there are multiple concurrent calls to clean, we
     * might not see s if another thread has already removed
     * it. But we can stop when we see any node known to
     * follow s. We use s.next unless it too is cancelled, in
     * which case we try the node one past. We don't check any
     * further because we don't want to doubly traverse just to
     * find sentinel.

    SNode past = s.next;
    if (past != null && past.isCancelled())
        past = past.next;

    // 删除头部被取消的节点
    SNode p;
    while ((p = head) != null && p != past && p.isCancelled())
        casHead(p, p.next);

    // 移除中间的节点
    while (p != null && p != past) {
        SNode n = p.next;
        if (n != null && n.isCancelled())
            p.casNext(n, n.next);
            p = n;


static final class TransferQueue<E> extends Transferer<E> {
     * This extends Scherer-Scott dual queue algorithm, differing,
     * among other ways, by using modes within nodes rather than
     * marked pointers. The algorithm is a little simpler than
     * that for stacks because fulfillers do not need explicit
     * nodes, and matching is done by CAS'ing QNode.item field
     * from non-null to null (for put) or vice versa (for take).

    /** Node class for TransferQueue. */
    static final class QNode {
        volatile QNode next;          // next node in queue
        volatile Object item;         // CAS'ed to or from null
        volatile Thread waiter;       // to control park/unpark
        final boolean isData;


  1. 如果队列为空或者头节点模式和自己的模式相同,尝试将自己增加到队列的等待者中,等待被满足或者被取消
  2. 如果队列包含了在等待的节点,并且本次调用是与之模式匹配的调用,尝试通过CAS修改等待节点item字段然后将其出队
E transfer(E e, boolean timed, long nanos) {
    /* Basic algorithm is to loop trying to take either of
     * two actions:
     * 1. If queue apparently empty or holding same-mode nodes,
     *    try to add node to queue of waiters, wait to be
     *    fulfilled (or cancelled) and return matching item.
     * 2. If queue apparently contains waiting items, and this
     *    call is of complementary mode, try to fulfill by CAS'ing
     *    item field of waiting node and dequeuing it, and then
     *    returning matching item.
     * In each case, along the way, check for and try to help
     * advance head and tail on behalf of other stalled/slow
     * threads.
     * The loop starts off with a null check guarding against
     * seeing uninitialized head or tail values. This never
     * happens in current SynchronousQueue, but could if
     * callers held non-volatile/final ref to the
     * transferer. The check is here anyway because it places
     * null checks at top of loop, which is usually faster
     * than having them implicitly interspersed.

    QNode s = null; // constructed/reused as needed
    boolean isData = (e != null);

    for (;;) {
        QNode t = tail;
        QNode h = head;
        if (t == null || h == null)         // saw uninitialized value
            continue;                       // spin

        // 如果队列为空或者模式与头节点相同
        if (h == t || t.isData == isData) { // empty or same-mode
            QNode tn = t.next;
            // 如果有其他线程修改了tail,进入下一循环重读
            if (t != tail)                  // inconsistent read
            // 如果有其他线程修改了tail,尝试cas更新尾节点,进入下一循环重读
            if (tn != null) {               // lagging tail
                advanceTail(t, tn);
            // 超时返回
            if (timed && nanos <= 0L)       // can't wait
                return null;
            // 构建一个新节点
            if (s == null)
                s = new QNode(e, isData);
            // 尝试CAS设置尾节点的next字段指向自己
            // 如果失败,重试
            if (!t.casNext(null, s))        // failed to link in
            // cas设置当前节点为尾节点
            advanceTail(t, s);              // swing tail and wait
            // 等待匹配的节点
            Object x = awaitFulfill(s, e, timed, nanos);
            // 如果被取消,删除自己,返回null
            if (x == s) {                   // wait was cancelled
                clean(t, s);
                return null;

            // 如果此节点没有被模式匹配的线程出队
            // 那么自己进行出队操作
            if (!s.isOffList()) {           // not already unlinked
                advanceHead(t, s);          // unlink if head
                if (x != null)              // and forget fields
                    s.item = s;
                s.waiter = null;
            return (x != null) ? (E)x : e;

        } else {                            // complementary-mode
            QNode m = h.next;               // node to fulfill
            // 数据不一致,重读
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            Object x = m.item;
            if (isData == (x != null) ||    // m already fulfilled     m已经匹配成功了
                x == m ||                   // m cancelled             m被取消了
                !m.casItem(x, e)) {         // lost CAS                CAS竞争失败
                // 上面三个条件无论哪一个满足,都证明m已经失效无用了,
                // 需要将其出队
                advanceHead(h, m);          // dequeue and retry

            // 成功匹配,依然需要将节点出队
            advanceHead(h, m);              // successfully fulfilled
            // 唤醒匹配节点,如果它被阻塞了
            return (x != null) ? (E)x : e;

Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
    /* Same idea as TransferStack.awaitFulfill */
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    int spins = (head.next == s)
        : 0;
    for (;;) {
        if (w.isInterrupted())
        Object x = s.item;
        // item被修改后返回
        // 如果put操作在此等待,item会被更新为null
        // 如果take操作再次等待,item会由null变为一个值
        if (x != e)
            return x;
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
        if (spins > 0) {
        else if (s.waiter == null)
            s.waiter = w;
        else if (!timed)
        else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
            LockSupport.parkNanos(this, nanos);



public operations


public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {
        throw new InterruptedException();

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
        return true;
    if (!Thread.interrupted())
        return false;
    throw new InterruptedException();

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    return transferer.transfer(e, true, 0) != null;


public E take() throws InterruptedException {
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    throw new InterruptedException();

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E e = transferer.transfer(null, true, unit.toNanos(timeout));
    if (e != null || !Thread.interrupted())
        return e;
    throw new InterruptedException();

public E poll() {
    return transferer.transfer(null, true, 0);


  1. 可以指定锁的公平性
  2. 队列内部不会存储元素,所以尽量避免使用add,offer此类立即返回的方法,除非有特殊需求
