CyclicBarrier源码解析

一个多线程协同器,它可以让一组线程相互等待,当等待的数量达到预设数量时这组线程通过等待继续工作。说得形象点,CyclicBarrier就好比汽车站滚动发车的模式,把客车看着CyclicBarrier,乘客看着是各个任务线程,当乘客到达客车时,需要等待另外的乘客,当乘客到齐后自动发车,如果等待乘客超时了,则将乘客全部赶下车(司机太凶残了),然后重新安排依次上车(是否要上车由乘客自己决定);每个上车的乘客都需要判断自己是否是这辆车的最后一个乘客,如果不是,则上车后立即开始睡觉,如果是最后一个,则他需要叫醒所有乘客。当然客车站在创建这些客车的时候可能会做一些额外的事情,例如所有乘客到齐后,司机给大伙一人发一瓶矿泉水,或者是其它的,但是前提条件就是乘客到齐。


8个线程协同的CyclicBarrier

图中的CyclicBarrier需要等待8个线程到达后才会“发车”,目前已经到达的线程有4个,还需要等待4个线程;线程上车的过程(也就是进入await的过程)是要进行排队的,这里是通过ReentrantLock来实现的,上车后的睡眠是通过锁的条件等待Condition来实现的。

首先看一下它的内部整体结构

public class CyclicBarrier {
    //一个标识,标识这一次的协同是否完成(正常完成,异常完成)
    private static class Generation {
        boolean broken = false;
    }

    //线程进入条件等待时需要获取锁
    private final ReentrantLock lock = new ReentrantLock();
    //等待条件
    private final Condition trip = lock.newCondition();
    //每次需要协同的线程数(客车的准载数)
    private final int parties;
    //这组线程(parties)满足协同条件后需要做的一件事情
    private final Runnable barrierCommand;
    //标识实例,一个generation代表一次线程协同
    private Generation generation = new Generation();

    //还需要等待的线程数量(还未上车的乘客数)
    private int count;

    //最后一个乘客上车后使用的工具,唤醒所有乘客,
    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation(); //换一辆车
    }
    //等待超时后,司机生气了,就用这个方法把大家叫醒,然后把这辆车标记为broken,把所有人赶下车
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }
    //车辆的构造器,客车占为车辆设置的规则
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
    //同上,只是线程协同完成后不需要做额外的动作
    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    /**
      一些核心方法
    **/
}

核心方法

CyclicBarrier的核心方法是await,该方法是线程相互等待的关键,它有两种实现,一种是带等待超时的,一种是不会等待超时:

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException,TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

从代码可以看出,其核心都是使用了dowait这个方法

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock(); //获取锁,这里可以看出线程进入等待是单线程的
    try {
        final Generation g = generation;  //当前的实例标记

        if (g.broken) //新到达的线程判断实例是否正常,如不正常则抛出异常
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {  //新到达的线程判断线程中断状态
            breakBarrier();  //如果线程被中断,则标记当前实例为中断,并唤醒所有等待线程
            throw new InterruptedException();
        }


        int index = --count;  //上车成功,还需要上车的人数减1
        if (index == 0) {  // tripped  //判断是否是最后一个乘客
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();  //最后一个乘客上车成功,人满,则执行客车统一的规定
                ranAction = true;
                nextGeneration();  //唤醒本车所有乘客,并帮助唤来下一辆车。
                return 0;
            } finally {
                if (!ranAction) //如果在执行客车统一任务的时候出了问题,则整趟车标记为broken,唤醒所有乘客并赶下车
                    breakBarrier();
            }
        }

        //如果上车的不是最后一个乘客
        for (;;) {
            try {
                if (!timed) 
                    trip.await(); //不需要判断睡眠时间,一直睡
                else if (nanos > 0L) //设置睡眠时间并睡眠
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {   //如果在失眠过程中被中断(这里不是被正常唤醒,是被中断)
                if (g == generation && ! g.broken) { //如果没有换车,并且客车也没被标记为broken
                    breakBarrier(); //则被中断的线程(乘客)负责将该辆车标记为中断
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();  //如果已经换车,或者被标记为了broken,则保存中断状态,继续后面的执行
                }
            }


            if (g.broken)
                throw new BrokenBarrierException();  //被标记为了broken(这可能自己前面标的,也可能其它线程标的),则所有的线程都抛出异常。

            if (g != generation)  //如果是正常被唤醒,则直接返回还需上车的人(理论上应该是0)
                return index;

            if (timed && nanos <= 0L) { //如果是应为等待超时,则抛出TimeoutException
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

总结

从代码可以看出,CyclicBarrier的实现是利用条件等待,用到条件等待当然就会用到锁。
多个线程协调过程中,只要有一个线程被中断或者发生异常,则整个协调取消。

CyclicBarrier与CountDownLatch异同点

相同点:
1.都能让多个线程协调,在某一个点上等待

不同点:
1.CyclicBarrier是多个线程自行协同,当线程到达等待数量时自动放行,而CountDownLatch是多个线程阻塞后,需要外界条件达到某种状态的时候才会被统一唤醒,即CyclicBarrier只需要各个线程await,而CountDownLatch还需要额外是countDown。
2.实现上,CyclicBarrier是使用独占锁+Condition实现的,而CountDownLatch是自己实现AQS,利用共享锁的原理实现。
3.CountDownLatch一旦满足条件后需要重新初始化才能再使用,而CyclicBarrier可以循环使用。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,657评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,889评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,057评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,509评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,562评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,443评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,251评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,129评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,561评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,779评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,902评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,621评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,220评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,838评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,971评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,025评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,843评论 2 354