从源码看JDK8并发工具类CountDownLatch的实现原理

CountDownLatch,是几个重要的并发编程工具类之一,字面意思就是门锁的意思,内部会维护一个计数器的常量,这个常量代表执行的线程数。

在多线程协作完成业务功能时,有时候需要等待其他多个线程完成任务之后,主线程才能继续往下执行业务功能,在这种的业务场景下,通常可以使用Thread类的join方法,让主线程等待被join的线程执行完之后,主线程才能继续往下执行。当然,使用线程间消息通信机制也可以完成。其实,java并发工具类中为我们提供了类似“倒计时”(CountDownLatch)这样的工具类,可以十分方便的完成所说的这种业务场景。

CountDownLatch允许一个或多个线程等待其他线程完成操作,调用await()方法的线程回去判断count的值来判断是否会被挂起,它会等待直到count值为0才会继续执行。控制台输出count=0最后输出,这个时候就看cpu切换到哪个线程上执行了,在初始化的时候我们会设置好count的值,当每调用一次countDown()方法,会使count的值减一也就是将AQS维护同步状态的state值减一。

在我们阅读源码之前,如果你看过AQS源码(https://www.jianshu.com/p/e0066f9349cd)与跟可重入锁(https://www.jianshu.com/p/5d57573b09f5)相关的内容,你会更加对CountDownLatch本身是如何实现的以及他的本质有一个更透彻的理解。

说说我的理解之前看看他的类

可以看到他同样运用了一个继承了AQS同步器的静态内部类来重写父类AQS里面的一些方法然后再调用该父类里面的获取锁的方法来实现具体的功能。

来看看构造方法中:

//设置初始化count的值,并传递给Sync类
public CountDownLatch(int count) {
       if (count < 0) throw new IllegalArgumentException("count < 0");
       this.sync = new Sync(count);
   }

Sync类的源码如下:CountDownLatch的实现依赖于AQS

先介绍下两个方法 countDown()每执行一次该方法,也就是将由AQS维护的同步状态值state值减1,其一般是执行任务的线程调用。
调用countDown()释放同步状态,每次调用同步状态值-1。

 public void countDown() {
        sync.releaseShared(1);
    }

//父类AQS中
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {   //如果释放同步状态线程成功,如果返回false,则表示,获取失败同步状态。
  //返回flase,以CountDownLatch的实现角度来讲,此时还要等待N(N>0)个线程,因为state还没减到等于0,如果返回true,表示此时已经执行N次了,此时state已经减到0了,这时候会执行doReleaseShared(),表示释放其他处于等待的节点。
            doReleaseShared();   //唤醒后续处于等待的节点,看下面具体的解释。
            return true;
        }
        return false;
    }

//在CountDownLatch的静态内部工具类Sync继承了AQS重写的tryReleaseShared
protected boolean tryReleaseShared(int releases) {
            // 自旋
            for (;;) {
                int c = getState();  //获取AQS维护的state值
                if (c == 0)   //如果为0,表示没有一个线程在运行返回false
                    return false;
                int nextc = c-1;     //如果不等于0,这里肯定会>0的,所以减去1
                if (compareAndSetState(c, nextc))  //CAS去直接修改内存地址的偏移量去修改值,保证线程安全。
                 return nextc == 0;       //重点来了。这里的意思是如果共享式获取同步状态后,state还不是为0,则获取失败。返回false
            }
        }

下面这个类,在我的这篇文章也解析过了。


await(),当执行该方法是,内部会检查那个计数常量的值,如果不等于0,就会进入等待(waiting)状态,直到执行了countDown使内部的值减到0的时候,就会恢复线程,同时执行,我们来看看实现:

public void await() throws InterruptedException {
        //共享式获取
        sync.acquireSharedInterruptibly(1);
    }

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())  //响应中断
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)   
      //tryAcquireShared(arg) 返回1,此时state=0不阻塞,返回的是-1,执行doAcquireSharedInterruptibly(arg);
            doAcquireSharedInterruptibly(arg);
    }

protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;   
        }

这里需要解释下doAcquireSharedInterruptibly的主要作用:1、将当前线程构造成共享模式的节点,通过自旋的方式尝试获取同步状态2、如果获取同步状态成功,则唤醒后续处于共享模式的节点;如果没有获取到同步状态,则对调用shouldParkAfterFailedAcquire(Node, Node)和parkAndCheckInterrupt()方法挂起当前线程,这样可以避免该线程无限循环而获取不到共享锁,从而造成资源浪费。这里需要注意的是:当有多个线程调用await()方法时,这些线程都会通过addWaiter(Node.SHARED)方法被构造成节点加入到等待队列中。当最后一个调用countDown()方法的线程执行了countDown()后(这里有点拗口),会唤醒处于等待队列中距离头节点最近的一个节点,也就是说该线程被唤醒之后会继续自旋尝试获取同步状态,此时执行到tryAcquireShared(int)方法时,发现r大于0(因为state已经被置为0了),该线程就会调用setHeadAndPropagate(Node, int)方法将唤醒传递下去,并且退出当前循环,开始执行awat()方法之后的代码。


然后说说CountDownLatch的两种用法:

1.可以设置new CountDownLatch(1); 如果需要控制多个线程同时开始执行的时候,可以每个线程刚开始执行run的时候,先执行await,
进入等待状态。当最后所有线程都准备好了,就调用countDown,减一,这时所有线程就会主动同时开始执行。
2.假设可以设置new CountDownLatch(10),这时有10个线程,我们需要做的是等10个线程,依次执行countDown(),等到所有线程都执行好了,这时候再执行await。所有线程都准备就绪了。

await(long timeout,TimeUtil unit)
作用使线程在指定的最大时间内,处于await状态,超过这个时间就会自动唤醒了。
getCount()
能够获取当前计数的值。

下面举一个实现的例子:

默认10个运动员进行跑步比赛的全过程:

public class MyThread extends Thread{

    /**等待运动员到来*/
    private CountDownLatch comingTag;
    /**等待裁判说开始*/
    private CountDownLatch waitTag;
    /** 等待起跑*/
    private CountDownLatch waitRunTag;
    /**起跑*/
    private CountDownLatch beginTag;
    /** 所有运动员道终点*/
    private CountDownLatch endTag;

    public MyThread(CountDownLatch comingTag, CountDownLatch waitTag, CountDownLatch waitRunTag, CountDownLatch beginTag, CountDownLatch endTag) {
        super();
        this.comingTag = comingTag;
        this.waitTag = waitTag;
        this.waitRunTag = waitRunTag;
        this.beginTag = beginTag;
        this.endTag = endTag;
    }

    @Override
    public void run() {
        try {
            System.out.println("运动员正陆续入场");
            Thread.sleep((int)Math.random()*10000);
            System.out.println(Thread.currentThread().getName()+"到起跑点了");
            comingTag.countDown();
            System.out.println("等待裁判说准备");
            waitTag.await();
            System.out.println("准备。。。。。开始");
            waitRunTag.countDown();
            beginTag.await();
            System.out.println(Thread.currentThread().getName()+"开始跑,并且跑步过程不确定");
            Thread.sleep((int)Math.random()*10000);
            endTag.countDown();
            System.out.println(Thread.currentThread().getName()+"到达终点");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

测试类:

public class Run {

    public static void main(String[] args) {


        CountDownLatch comingTag = new CountDownLatch(10);
        CountDownLatch waitTag=new CountDownLatch(1);
        CountDownLatch waitRunTag = new CountDownLatch(10);
        CountDownLatch beginTag=new CountDownLatch(1);
        CountDownLatch endTag = new CountDownLatch(10);

        MyThread[] threads=new MyThread[10];

        for(int i=0;i<threads.length;i++){
            threads[i]=new MyThread(comingTag,waitTag,waitRunTag,beginTag,endTag);
            threads[i].setName("运动员"+(i+1));
            threads[i].start();
        }

        try {
            System.out.println("裁判正在等待选手的到来。。。。");
            comingTag.await();
            System.out.println("所有的运动员都到齐了,准备开始,各就位。。。。预备");
            Thread.sleep(5000);
            waitTag.countDown();
            System.out.println("各就各位。。。。");
            waitRunTag.await();
            Thread.sleep(2000);
            System.out.println("命令枪,开!!!");
            beginTag.countDown();
            endTag.await();
            System.out.println("所有运动员都到得终点了。。。");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
image.png

image.png

一般来说,都会把CountDownLatch与CyclicBarrier进行比较?

CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;CountDownLatch强调一个线程等多个线程完成某件事情。CyclicBarrier是多个线程互等,等大家都完成,再携手共进。
调用CountDownLatch的countDown方法后,当前线程并不会阻塞,会继续往下执行;而调用CyclicBarrier的await方法,会阻塞当前线程,直到CyclicBarrier指定的线程全部都到达了指定点的时候,才能继续往下执行;
CountDownLatch方法比较少,操作比较简单,而CyclicBarrier提供的方法更多,比如能够通过getNumberWaiting(),isBroken()这些方法获取当前多个线程的状态,并且CyclicBarrier的构造方法可以传入barrierAction,指定当所有线程都到达时执行的业务功能;
CountDownLatch是不能复用的,而CyclicBarrier是可以复用的。就是说,当CountDownLatch执行countDown时如果此时countDown执行的state的值减到0了,这时候再调用,不能循环执行了,而CyclicBarrier是可以的,可以看一下这篇文章:
https://www.jianshu.com/p/ff6c2ef5e8c2

整理不易,喜欢可以关注我

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容