并发十一:条件队列Condition实现分析

Condition

Condition是J.U.C包中的一个接口,提供了三个主要方法await、signal和signalAll。和Object中的wait、notify、notifyAll三个监视器方法语义一致。

监视器方法必须放到synchronized修饰的同步体内,Condition接口也是一样,要放到Lock的监视范围也就是lock和unlock之间的代码块内调用,否则会抛出IllegalMonitorStateException异常。

一个小栗子:

public class ConditionTest {
    public static class ResponseFuture {
        private final Lock lock = new ReentrantLock();
        private final Condition condition = lock.newCondition();
        private String response;

        public boolean isDone() {// 是否处理完成
            return response != null;
        }

        public String getResponse() throws InterruptedException{ // 获取响应
            if (!isDone()) {
                lock.lock();
                try {
                    while (!isDone()) {
                        condition.await();// 线程阻塞等待 
                        if (isDone()) {
                            break;
                        }
                    }
                } finally {
                    lock.unlock();
                }
            }
            return response;
        }

        public void done(String response) {// 处理完成
            lock.lock();
            try {
                this.response = response;
                condition.signal();// 唤醒阻塞等待的线程
            } finally {
                lock.unlock();
            }
        }
    }

    public static void main(String[] args) {
        final ResponseFuture responseFuture = new ResponseFuture();
        new Thread(new Runnable() {// 请求线程
            public void run() {
                System.out.println("发送一个同步请求");
                try {
                    // 获取反馈内容,请求没有反馈就会一直等待
                    System.out.println(responseFuture.getResponse());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        
        
        new Thread(new Runnable() {// 处理线程
            public void run() {
                try {
                    Thread.sleep(10000);// 模拟处理一会
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 处理完成
                responseFuture.done("请求处理完成");
            }
        }).start();
    }
}

ConditionObjec

Condition接口的实现ConditionObject是AbstractQueuedSynchronizer的内部
在AQS()中,:

public class ConditionObject implements Condition, 
                java.io.Serializable {
    private static final int REINTERRUPT = 1;
    private static final int THROW_IE = -1;
    /** 条件队列首节点 */
    private transient Node firstWaiter;
    /** 条件队列尾节点 */
    private transient Node lastWaiter;
    public ConditionObject() {}
    ... ...
}

原理

维护一个链表Node队列,用来进行阻塞线程的排队和调度,称为条件队列。

线程调用了condition.await()后将先释放该线程持有的锁,构造成Node加入条件队列,然后再挂起该线程。

其他线程调用了condition.signal()后,将条件队列中第一个等待的Node节点转移到AQS同步队列,在AQS同步队列中参与锁获取的调度,如果获取到锁则该线程被唤醒。

await流程

// s1
public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();// 如果中断 抛出异常
            Node node = addConditionWaiter();// 入列 s2
            int savedState = fullyRelease(node);// 释放当前锁 s3
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {//  不在与同步队列
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

s1:线程未中断开始入列,转入s2,否则直接抛出异常。
入列成功开始释放当前线程持有的锁进入s3。
isOnSyncQueue(node)判断节点是否在AQS同步队列中,如果不在,说明该线程还没有资格参与锁的竞争,将当前线程挂起。
直到node加入了AQS同步队列或者node对应的线程被中断,才退出while{}循环。
checkInterruptWhileWaiting方法会检查中断发生的时机:
signal之前发生的中断返回THROW_IE,表示抛出InterruptedException
signal之后发生的中断返回REINTERRUPT,表示记录中断状态
否则返回0,表示无中断
即便是发生中断transferAfterCancelledWait方法依然会将node转移到AQS同步队列。
调用acquireQueued()方法在AQS同步队列中竞争锁,拿到锁后开始处理中断,reportInterruptAfterWait方法中根据interruptMode选择是抛出异常还是记录状态。

// s2
private Node addConditionWaiter() {
            Node t = lastWaiter;
            // 如果尾节点状态不为CONDITION,清出队列
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            // 构造节点并入列
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

s2:如果尾节点t不为空或状态不为CONDITION,调用
unlinkCancelledWaiters()方法,这是一个循环方法目的是从头节点开始摘掉所以状态为不为CONDITION的节点。
t==null说明当前的队列还是空队列,将node赋给头节点firstWaiter,否则,将node链入尾部。将尾节点lastWaiter指向node,返回s1。

// s3
final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();// 获取当前同步状态
            if (release(savedState)) {// 释放锁
                failed = false;
                return savedState;// 释放成功返回同步状态
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)// 释放失败将节点标识为取消
                node.waitStatus = Node.CANCELLED;
        }
    }

s3:用release()方法释放当前锁,这是独占锁的释放方法,所以Condition对象只支持在独占锁中使用。释放完毕返回s1。

signal流程

public final void signal() {
    if (!isHeldExclusively())//非持有线程 抛出异常
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;//头节点
    if (first != null)//队列非空
        doSignal(first);
}
private void doSignal(Node first) {
    do {
        if ((firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) && (first = firstWaiter) != null);
}

s1: 如果first.nextWaiter==null说明队列为空,将尾节点lastWaiter也置空。
first.nextWaiter = null将头节点从队列摘掉。
转入s2进行节点转移。
转移成功则退出循环,转移不成功并且队列中还有等待的节点继续转移下个节点。

// s2
final boolean transferForSignal(Node node) {
    //节点状态从Node.CONDITION 置换为0,
    //因为CONDITION是条件节点的状态
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    Node p = enq(node);//入同步队列
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

s2:CAS将节点状态由CONDITION设置为0。
enq(node)为AQS中同步队列的入列方法,将节点加入同步队列。
ws > 0说明节点被取消了(有可能中断)或者将节点状态置为SIGNAL失败,则直接唤醒线程。
被唤醒的线程会在"await流程s1处"参与锁的竞争或者处理中断。

signalAll和signal逻辑一样,只是在循环中执行transferForSignal(first),将条件队列中的节点依次全部转移到同步队列。

小结:

1:Condition只能使用在独占锁中使用,如ReentrantLock、ReentrantReadWriteLock.writerLock。在共享锁中则无法使用 ,如ReentrantReadWriteLock.readerLock。

2:Condition中的方法,必须要放到Lock的监视范围内,也就是lock和unlock之间的代码块内调用。

码字不易,转载请保留原文连接https://www.jianshu.com/p/e72b43ebd788

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352

推荐阅读更多精彩内容