25.Condition

1.与Lock的关系

Condition在同步锁synchronized中用的比较多。
Condition本身也是一个接口,其功能和wait/notify类似。

public interface Condition {
    void await() throws InterruptedException;  //等待

    void awaitUninterruptibly(); 

    long awaitNanos(long var1) throws InterruptedException;

    boolean await(long var1, TimeUnit var3) throws InterruptedException;  //计时等待

    boolean awaitUntil(Date var1) throws InterruptedException;

    void signal();  //唤醒

    void signalAll();   //唤醒全部
}

wait()/notify()必须和synchronized一起使用,Condition也必须和Lock一起使用。因此,在Lock的接口中,有一个与Condition相关的接口:(所有的Condition都是从Lock中构造出来的)

public interface Lock {
    void lock();

    void lockInterruptibly() throws InterruptedException;

    boolean tryLock();

    boolean tryLock(long var1, TimeUnit var3) throws InterruptedException;

    void unlock();
    // 所有的Condition都是从Lock中构造出来的
    Condition newCondition();
}

2.使用场景

以以ArrayBlockingQueue为例:为一个用数组实现的阻塞队列,执行put(...)操作的时候,队列满了,生产者线程被阻塞;执行take()操作的时候,队列为空,消费者线程被阻塞。

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable {
    final Object[] items;
    int takeIndex;
    int putIndex;
    int count;
    // 一把锁+两个条件
    final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;
    transient ArrayBlockingQueue<E>.Itrs itrs;
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0) {
            throw new IllegalArgumentException();
        } else {
            this.items = new Object[capacity];
            // 构造器中创建一把锁加两个条件
            this.lock = new ReentrantLock(fair);
            // 构造器中创建一把锁加两个条件
            this.notEmpty = this.lock.newCondition();
            // 构造器中创建一把锁加两个条件
            this.notFull = this.lock.newCondition();
        }
    }
    public void put(E e) throws InterruptedException {
        Objects.requireNonNull(e);
        ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while(this.count == this.items.length) {
                // 非满条件阻塞,队列容量已满
                this.notFull.await();
            }
            this.enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    private void enqueue(E e) {
        Object[] items = this.items;
        items[this.putIndex] = e;
        if (++this.putIndex == items.length) {
            this.putIndex = 0;
        }

        ++this.count;
        // put数据结束,通知消费者非空条件
        this.notEmpty.signal();
    }

    public E take() throws InterruptedException {
        ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        Object var2;
        try {
            while(this.count == 0) {
                // 阻塞于非空条件,队列元素个数为0,无法消费
                this.notEmpty.await();
            }
            var2 = this.dequeue();
        } finally {
            lock.unlock();
        }
        return var2;
    }

    private E dequeue() {
        Object[] items = this.items;
        E e = items[this.takeIndex];
        items[this.takeIndex] = null;
        if (++this.takeIndex == items.length) {
            this.takeIndex = 0;
        }

        --this.count;
        if (this.itrs != null) {
            this.itrs.elementDequeued();
        }
        // 消费成功,通知非满条件,队列中有空间,可以生产元素了。
        this.notFull.signal();
        return e;
    }

3.实现原理

Condition的使用很方便,避免了wait/notify的唤醒操作不仅唤醒生产者也会唤醒消费者问题。
由于Condition必须和Lock一起使用,所以Condition的实现也是Lock的一部分。首先查看互斥锁和读写锁中Condition的构造方法:

public class ReentrantLock implements Lock, java.io.Serializable {
    public Condition newCondition() {
        return this.sync.newCondition();
    }
}

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
    private final ReentrantReadWriteLock.ReadLock readerLock;
    private final ReentrantReadWriteLock.WriteLock writerLock;
    public static class ReadLock implements Lock, Serializable {
        // 读锁不支持Condition
        public Condition newCondition() {
            throw new UnsupportedOperationException();
        }
    }
    public static class WriteLock implements Lock, java.io.Serializable {
        public Condition newCondition() {
            return this.sync.newCondition();
        }
    }
}

首先,读写锁中的 ReadLock 是不支持 Condition 的,读写锁的写锁和互斥锁都支持Condition。虽然它们各自调用的是自己的内部类Sync,但内部类Sync都继承自AQS。因此,上面的代码sync.newCondition最终都调用了AQS中的newCondition。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { 
    public class ConditionObject implements Condition, java.io.Serializable { 
        // Condition的所有实现,都在ConditionObject类中 
    } 
}
abstract static class Sync extends AbstractQueuedSynchronizer { 
    final ConditionObject newCondition() { 
        return new ConditionObject(); 
    } 
}

每一个Condition对象上面,都阻塞了多个线程。因此,在ConditionObject内部也有一个双向链表组成的队列

public class ConditionObject implements Condition, java.io.Serializable { 
    private transient Node firstWaiter; 
    private transient Node lastWaiter; 
}
static final class Node { 
    volatile Node prev; 
    volatile Node next;
     volatile Thread thread; 
    Node nextWaiter; 
}

4.await()实现分析

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()实现分析

public final void await() throws InterruptedException { 
    // 刚要执行await()操作,收到中断信号,抛异常 
    if (Thread.interrupted()) throw new InterruptedException(); 
    // 加入Condition的等待队列 
    Node node = addConditionWaiter(); 
    // 阻塞在Condition之前必须先释放锁,否则会死锁 
    int savedState = fullyRelease(node); 
    int interruptMode = 0; 
    while (!isOnSyncQueue(node)) { 
        // 阻塞当前线程 LockSupport.park(this); 
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; 
    }
    // 重新获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 
        interruptMode = REINTERRUPT;
    .....

关于await,有几个关键点要说明:

  1. 线程调用 await()的时候,肯定已经先拿到了锁。所以,在 addConditionWaiter()内部,对这个双向链表的操作不需要执行CAS操作,线程天生是安全的,代码如下:
private Node addConditionWaiter() { 
    // ... 
    Node t = lastWaiter; 
    Node node = new Node(Thread.currentThread(), Node.CONDITION); 
    if (t == null) 
        firstWaiter = node; 
    else
        t.nextWaiter = node; 
    lastWaiter = node; 
    return node; 
}
  1. 在线程执行wait操作之前,必须先释放锁。也就是fullyRelease(node),否则会发生死锁。这个和wait/notify与synchronized的配合机制一样。
  2. 线程从await中被唤醒后,必须用acquireQueued(node, savedState)方法重新拿锁。
  3. checkInterruptWhileWaiting(node)代码在park(this)代码之后,是为了检测在park期间是否收到过中断信号。当线程从park中醒来时,有两种可能:一种是其他线程调用了unpark,另一种是收到中断信号。这里的await()方法是可以响应中断的,所以当发现自己是被中断唤醒的,而不是被unpark唤醒的时,会直接退出while循环,await()方法也会返回。
  4. isOnSyncQueue(node)用于判断该Node是否在AQS的同步队列里面。初始的时候,Node只 在Condition的队列里,而不在AQS的队列里。但执行notity操作的时候,会放进AQS的同步队列。

5.signal实现

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal()

        public final void signal() {
            //// 只有持有锁的线程,才有资格调用signal()方法
            if (!AbstractQueuedSynchronizer.this.isHeldExclusively()) {
                throw new IllegalMonitorStateException();
            } else {
                AbstractQueuedSynchronizer.Node first = this.firstWaiter;
                if (first != null) {
                    // 发起通知
                    this.doSignal(first);
                }
            }
        }

        // 唤醒队列中的第1个线程
        private void doSignal(AbstractQueuedSynchronizer.Node first) {
            do {
                if ((this.firstWaiter = first.nextWaiter) == null) {
                    this.lastWaiter = null;
                }
                first.nextWaiter = null;
            } while(!AbstractQueuedSynchronizer.this.transferForSignal(first) && (first = this.firstWaiter) != null);

        }

        final boolean transferForSignal(Node node) {
            if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) 
                return false; 
            // 先把Node放入互斥锁的同步队列中,再调用unpark方法 
            Node p = enq(node); 
            int ws = p.waitStatus; 
            if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) 
                LockSupport.unpark(node.thread); 
            return true; 
        }

同 await()一样,在调用 signal()的时候,必须先拿到锁(否则就会抛出上面的异常),是因为前面执行await()的时候,把锁释放了。
然后,从队列中取出firstWaiter,唤醒它。在通过调用unpark唤醒它之前,先用enq(node)方法把这个Node从condition中移除并放入AQS的锁对应的阻塞队列中。也正因为如此,才有了await()方法里面的判断条件:

  • while( ! isOnSyncQueue(node))
  • 这个判断条件满足,说明await线程不是被中断,而是被unpark唤醒的。
  • notifyAll()与此类似。

简述:
阻塞队列使用condition实现,condition必须和Lock一起使用,一般使用ReentrantLock来创建condition阻塞条件,并用ReentrantLock的锁来加锁控制并发。解决了wait/notify同时唤醒生产者和消费者的问题。读写锁中的 ReadLock 是不支持 Condition 的,读写锁的写锁和互斥锁都支持Condition。他们最终都调用了AQS中的newCondition,ConditionObject是condition的具体实现,内部也有一个双向链表组成的队列来存储阻塞的队列。一开始调用await会放入ConditionObject的队列,然后调用最终是调用park原语阻塞,最后调用signal的时候把线程在ConditionObject中移除,放入阻塞队列本身,并调用unpark。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容