并发容器BlockingQueue - SynchronousQueue

1.官方文档

A blocking queue in which each insert operation must wait for a 
corresponding remove operation by another thread, and vice 
versa. A synchronous queue does not have any internal capacity, 
not even a capacity of one. You cannot peek at a synchronous 
queue because an element is only present when you try to remove 
it; you cannot insert an element (using any method) unless another
 thread is trying to remove it; you cannot iterate as there is nothing 
to iterate. The head of the queue is the element that the first 
queued inserting thread is trying to add to the queue; if there is no 
such queued thread then no element is available for removal and 
poll() will return null. For purposes of other Collection methods (for 
example contains), a SynchronousQueue acts as an empty 
collection. This queue does not permit null elements.

Synchronous queues are similar to rendezvous channels used in 
CSP and Ada. They are well suited for handoff designs, in which 
an object running in one thread must sync up with an object 
running in another thread in order to hand it some information, 
event, or task.

This class supports an optional fairness policy for ordering waiting 
producer and consumer threads. By default, this ordering is not 
guaranteed. However, a queue constructed with fairness set to true 
grants threads access in FIFO order.

阻塞队列,每个插入操作必须等待另一个线程执行相应的删除操作,反之亦然。队列没有任何内部容量。无法查看队列,不能插入元素,除非另一个线程视图删除它;不能迭代,因为没有什么可迭代的。队列不允许null元素。

SynchronousQueue类似于CSP和Ada中使用的集合点通道。非常适合于handoff设计,在一个线程中运行的对象必须与另一个线程中运行的对象同步,以便传递某些信息、事件或任务。

可选公平策略。

This class implements extensions of the dual stack and dual
queue algorithms described in "Nonblocking Concurrent Objects
with Condition Synchronization", by W. N. Scherer III and
M. L. Scott.  18th Annual Conf. on Distributed Computing,
Oct. 2004 (see also
http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html).
The (Lifo) stack is used for non-fair mode, and the (Fifo)
queue for fair mode. The performance of the two is generally
similar. Fifo usually supports higher throughput under
contention but Lifo maintains higher thread locality in common
applications.

A dual queue (and similarly stack) is one that at any given
time either holds "data" -- items provided by put operations,
or "requests" -- slots representing take operations, or is
empty. A call to "fulfill" (i.e., a call requesting an item
from a queue holding data or vice versa) dequeues a
complementary node.  The most interesting feature of these
queues is that any operation can figure out which mode the
queue is in, and act accordingly without needing locks.

该类扩展实现了W.N.Scherer III和M.L.Scott的“Nonblocking Concurrent Objects with Condition Synchronization”中描述的dual stack和dual queue算法。LIFO栈用于非公平模式,FIFO队列用于公平模式。两者性能大致相似。FIFO在争用情况下通常有更高的吞吐量,而LIFO在常见的应用程序中保持更好的线程局部性。

双重队列(双重栈类似)在任何时刻是如下三种状态:

  • 持有数据,put操作
  • 持有请求,take操作

fufill调用会使互补的结点出队。该队列最有趣的特性是,任何操作都可以确定队列所处的模式,并在不需要锁的情况下执行相应的操作。

Both the queue and stack extend abstract class Transferer
defining the single method transfer that does a put or a
take. These are unified into a single method because in dual
data structures, the put and take operations are symmetrical,
so nearly all code can be combined. The resulting transfer
methods are on the long side, but are easier to follow than
they would be if broken up into nearly-duplicated parts.

The queue and stack data structures share many conceptual
similarities but very few concrete details. For simplicity,
they are kept distinct so that they can later evolve
separately.

队列和栈都继承自抽象类Transfer,该类只定义了一个方法transfer。因为put和take操作是对称的,所以定义一个方法完全能够满足要求。

queue和take是不同的以便于后面独自演进。

The algorithms here differ from the versions in the above paper
in extending them for use in synchronous queues, as well as
dealing with cancellation. The main differences include:

 1. The original algorithms used bit-marked pointers, but
    the ones here use mode bits in nodes, leading to a number
    of further adaptations.
 2. SynchronousQueues must block threads waiting to become
    fulfilled.
 3. Support for cancellation via timeout and interrupts,
    including cleaning out cancelled nodes/threads
    from lists to avoid garbage retention and memory depletion.

该算法与论文中的算法不同之处:

  • 1)原始算法使用位标记指针,但这里的算法使用结点里面的模式位,从而导致了更多的调整
  • 2)SynchronousQueues必须阻塞线程以等待fulfiiled
  • 3)支持超时或中断取消,包括从链表中清楚已取消的结点或线程,以避免垃圾滞留和内存耗尽。
Blocking is mainly accomplished using LockSupport park/unpark,
except that nodes that appear to be the next ones to become
fulfilled first spin a bit (on multiprocessors only). On very
busy synchronous queues, spinning can dramatically improve
throughput. And on less busy ones, the amount of spinning is
small enough not to be noticeable.

Cleaning is done in different ways in queues vs stacks.  For
queues, we can almost always remove a node immediately in O(1)
time (modulo retries for consistency checks) when it is
cancelled. But if it may be pinned as the current tail, it must
wait until some subsequent cancellation. For stacks, we need a
potentially O(n) traversal to be sure that we can remove the
node, but this can run concurrently with other threads
accessing the stack.

阻塞是通过LockSupport park/unpark完成的,除了即将成为下一个fulfilled结点,此时会首先自旋一会。在非常繁忙的同步队列中,自旋可以显著提高吞吐量,在不太繁忙的队列中,自旋占用的时间足够少,可以忽略。

清楚在队列和栈中清楚方式不同。在队列中,几乎总是可以在取消后以O(1)时间立即删除结点。但是可能被固定为当前的尾部,此时必须等到后续的取消才能进行删除。对于栈,需要潜在的O(n)遍历来确保可以删除结点,但是可以与其他访问线程并发运行。

While garbage collection takes care of most node reclamation
issues that otherwise complicate nonblocking algorithms, care
is taken to "forget" references to data, other nodes, and
threads that might be held on to long-term by blocked
threads. In cases where setting to null would otherwise
conflict with main algorithms, this is done by changing a
node's link to now point to the node itself. This doesn't arise
much for Stack nodes (because blocked threads do not hang on to
old head pointers), but references in Queue nodes must be
aggressively forgotten to avoid reachability of everything any
node has ever referred to since arrival.

垃圾收集处理大多数节点回收问题,否则会使非阻塞算法复杂化。但是必须forget在阻塞线程长期持有的对数据、其他结点或线程的应用。如果设置为null会与主算法冲突,所以通过self-link解决。这对于栈没有太大问题,但是必须积极处理(forget)队列中引用。

2.构造器、put和take

public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }
    public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }

3.双重队列和双重栈

3.1 TransferQueue

        /** Node class for TransferQueue. */
        static final class QNode {
            volatile QNode next;          // next node in queue
            volatile Object item;         // CAS'ed to or from null
            volatile Thread waiter;       // to control park/unpark
            final boolean isData;

TransferQueue的成员:

        /** Head of queue */
        transient volatile QNode head;
        /** Tail of queue */
        transient volatile QNode tail;
        /**
         * Reference to a cancelled node that might not yet have been
         * unlinked from queue because it was the last inserted node
         * when it was cancelled.
         */
        transient volatile QNode cleanMe;

构造器:

        TransferQueue() {
            QNode h = new QNode(null, false); // initialize to dummy node.
            head = h;
            tail = h;
        }

重点看下transfer方法:

        /**
         * Puts or takes an item.
         */
        @SuppressWarnings("unchecked")
        E transfer(E e, boolean timed, long nanos) {
            /* Basic algorithm is to loop trying to take either of
             * two actions:
             *
             * 1. If queue apparently empty or holding same-mode nodes,
             *    try to add node to queue of waiters, wait to be
             *    fulfilled (or cancelled) and return matching item.
             *
             * 2. If queue apparently contains waiting items, and this
             *    call is of complementary mode, try to fulfill by CAS'ing
             *    item field of waiting node and dequeuing it, and then
             *    returning matching item.
             *
             * In each case, along the way, check for and try to help
             * advance head and tail on behalf of other stalled/slow
             * threads.
             *
             * The loop starts off with a null check guarding against
             * seeing uninitialized head or tail values. This never
             * happens in current SynchronousQueue, but could if
             * callers held non-volatile/final ref to the
             * transferer. The check is here anyway because it places
             * null checks at top of loop, which is usually faster
             * than having them implicitly interspersed.
             */

            QNode s = null; // constructed/reused as needed
            boolean isData = (e != null);

            for (;;) {
                QNode t = tail;
                QNode h = head;
                if (t == null || h == null)         // saw uninitialized value
                    continue;                       // spin

                if (h == t || t.isData == isData) { // empty or same-mode
                    QNode tn = t.next;
                    if (t != tail)                  // inconsistent read
                        continue;
                    if (tn != null) {               // lagging tail
                        advanceTail(t, tn);
                        continue;
                    }
                    if (timed && nanos <= 0)        // can't wait
                        return null;
                    if (s == null)
                        s = new QNode(e, isData);
                    if (!t.casNext(null, s))        // failed to link in
                        continue;

                    advanceTail(t, s);              // swing tail and wait
                    Object x = awaitFulfill(s, e, timed, nanos);
                    if (x == s) {                   // wait was cancelled
                        clean(t, s);
                        return null;
                    }

                    if (!s.isOffList()) {           // not already unlinked
                        advanceHead(t, s);          // unlink if head
                        if (x != null)              // and forget fields
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null) ? (E)x : e;

                } else {                            // complementary-mode
                    QNode m = h.next;               // node to fulfill
                    if (t != tail || m == null || h != head)
                        continue;                   // inconsistent read

                    Object x = m.item;
                    if (isData == (x != null) ||    // m already fulfilled
                        x == m ||                   // m cancelled
                        !m.casItem(x, e)) {         // lost CAS
                        advanceHead(h, m);          // dequeue and retry
                        continue;
                    }

                    advanceHead(h, m);              // successfully fulfilled
                    LockSupport.unpark(m.waiter);
                    return (x != null) ? (E)x : e;
                }
            }
        }
  • 通过e是否为null来区分当前调用线程是生产者还是消费者

主要分为两种情况:

  • 1)如果队列为空或模式相同,则加入队列
    加入队列,调用awaitFulfill()方法自旋+阻塞当前入队的线程并等待被匹配到。
    后半部分还有fulfill的操作,根据是消费者还是生产者返回不同东西。
  • 2)如果是相反模式,则从队列中删除结点(fulfill)
    唤醒头结点并进行fulfill

实例:



线程put1执行 put(1)操作:



线程put2执行了put(2)操作:

线程take1执行take操作,公平策略,所谓公平就是谁先入队了,谁就优先被唤醒,因此put1应该优先被唤醒(公平策略总结下来就是:队尾匹配队头出队。):



线程take2执行take操作:

3.2 TransferStack

        static final class SNode {
            volatile SNode next;        // next node in stack
            volatile SNode match;       // the node matched to this
            volatile Thread waiter;     // to control park/unpark
            Object item;                // data; or null for REQUESTs
            int mode;

TransferStack成员:

        /** The head (top) of the stack */
        volatile SNode head;

重点看下transfer:

        /**
         * Puts or takes an item.
         */
        @SuppressWarnings("unchecked")
        E transfer(E e, boolean timed, long nanos) {
            /*
             * Basic algorithm is to loop trying one of three actions:
             *
             * 1. If apparently empty or already containing nodes of same
             *    mode, try to push node on stack and wait for a match,
             *    returning it, or null if cancelled.
             *
             * 2. If apparently containing node of complementary mode,
             *    try to push a fulfilling node on to stack, match
             *    with corresponding waiting node, pop both from
             *    stack, and return matched item. The matching or
             *    unlinking might not actually be necessary because of
             *    other threads performing action 3:
             *
             * 3. If top of stack already holds another fulfilling node,
             *    help it out by doing its match and/or pop
             *    operations, and then continue. The code for helping
             *    is essentially the same as for fulfilling, except
             *    that it doesn't return the item.
             */

            SNode s = null; // constructed/reused as needed
            int mode = (e == null) ? REQUEST : DATA;

            for (;;) {
                SNode h = head;
                if (h == null || h.mode == mode) {  // empty or same-mode
                    if (timed && nanos <= 0) {      // can't wait
                        if (h != null && h.isCancelled())
                            casHead(h, h.next);     // pop cancelled node
                        else
                            return null;
                    } else if (casHead(h, s = snode(s, e, h, mode))) {
                        SNode m = awaitFulfill(s, timed, nanos);
                        if (m == s) {               // wait was cancelled
                            clean(s);
                            return null;
                        }
                        if ((h = head) != null && h.next == s)
                            casHead(h, s.next);     // help s's fulfiller
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    }
                } else if (!isFulfilling(h.mode)) { // try to fulfill
                    if (h.isCancelled())            // already cancelled
                        casHead(h, h.next);         // pop and retry
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                        for (;;) { // loop until matched or waiters disappear
                            SNode m = s.next;       // m is s's match
                            if (m == null) {        // all waiters are gone
                                casHead(s, null);   // pop fulfill node
                                s = null;           // use new node next time
                                break;              // restart main loop
                            }
                            SNode mn = m.next;
                            if (m.tryMatch(s)) {
                                casHead(s, mn);     // pop both s and m
                                return (E) ((mode == REQUEST) ? m.item : s.item);
                            } else                  // lost match
                                s.casNext(m, mn);   // help unlink
                        }
                    }
                } else {                            // help a fulfiller
                    SNode m = h.next;               // m is h's match
                    if (m == null)                  // waiter is gone
                        casHead(h, null);           // pop fulfilling node
                    else {
                        SNode mn = m.next;
                        if (m.tryMatch(h))          // help match
                            casHead(h, mn);         // pop both h and m
                        else                        // lost match
                            h.casNext(m, mn);       // help unlink
                    }
                }
            }
        }
  • 同理,通过e是否为null来区分当前调用线程是生产者还是消费者

主要分为三种情况:

  • 1)如果栈为空或模式相同,则压入栈
    入栈成功后,调用awaitFulfill()方法自旋+阻塞当前入栈的线程并等待被匹配到。
    后半部分还有fulfill成功后的操作,根据是消费者还是生产者返回不同东西。
  • 2)模式不同,则进行fulfill操作
    会首先压入一个fulfilling结点入栈,如果匹配成功,则弹出栈顶两个元素。
  • 3)如果碰到栈顶是一个fulfilling结点
    则会协助对栈顶的两个元素进行匹配

实例:
线程put1执行 put(1)操作:



线程put2再次执行了put(2)操作:



线程take1执行了take操作,时候发现栈顶为put2线程,匹配成功,但是实现会先把take1线程入栈,然后take1线程循环执行匹配put2线程逻辑,一旦发现没有并发冲突,就会把栈顶指针直接指向 put1线程:

线程take2,执行take操作:

参考

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容