CountDownLatch使用及原理

一、概念

CountDownLatch可以使一个或多个线程等待其他线程各自执行完毕后再执行。

CountDownLatch 定义了一个计数器,和一个阻塞队列, 当计数器的值递减为0之前,阻塞队列里面的线程处于挂起状态,当计数器递减到0时会唤醒阻塞队列所有线程,这里的计数器是一个标志,可以表示一个任务一个线程,也可以表示一个倒计时器,CountDownLatch可以解决那些一个或者多个线程在执行之前必须依赖于某些必要的前提业务先执行的场景。

二、常用方法说明

  • CountDownLatch(int count):构造方法,创建一个值为count 的计数器。

  • await():阻塞当前线程,将当前线程加入阻塞队列。

  • await(long timeout, TimeUnit unit):在timeout的时间之内阻塞当前线程,时间一过则当前线程可以执行,

  • countDown():对计数器进行递减1操作,当计数器递减至0时,当前线程会去唤醒阻塞队列里的所有线程。

三、使用场景

3.1 多线程优化报表统计
3.1.1 功能现状

运营系统有统计报表、业务为统计每日的用户新增数量、订单数量、商品的总销量、总销售额......等多项指标统一展示出来,因为数据量比较大,统计指标涉及到的业务范围也比较多,所以这个统计报表的页面一直加载很慢,所以需要对统计报表这块性能需进行优化。

3.1.2 问题分析

统计报表页面涉及到的统计指标数据比较多,每个指标需要单独的去查询统计数据库数据,单个指标只要几秒钟,但是页面的指标有10多个,所以整体下来页面渲染需要将近一分钟。

3.1.3 解决方案

任务时间长是因为统计指标多,而且指标是串行的方式去进行统计的,我们只需要考虑把这些指标从串行化的执行方式改成并行的执行方式,那么整个页面的时间的渲染时间就会大大的缩短, 如何让多个线程同步的执行任务,我们这里考虑使用多线程,每个查询任务单独创建一个线程去执行,这样每个统计指标就可以并行的处理了。

3.1.4 要求

因为主线程需要每个线程的统计结果进行聚合,然后返回给前端渲染,所以这里需要提供一种机制让主线程等所有的子线程都执行完之后再对每个线程统计的指标进行聚合。 这里我们使用CountDownLatch 来完成此功能。

3.1.5 模拟代码

1、分别统计4个指标用户新增数量、订单数量、商品的总销量、总销售额;

2、假设每个指标执行时间为3秒。如果是串行化的统计方式那么总执行时间会为12秒。

3、我们这里使用多线程并行,开启4个子线程分别进行统计

4、主线程等待4个子线程都执行完毕之后,返回结果给前端。

【代码】

public class CountDownLatchTest {

    //用于聚合所有的统计指标
    private static Map map = new HashMap();

    //创建计数器,这里需要统计4个指标
    private static CountDownLatch countDownLatch = new CountDownLatch(4);

    public static void main(String[] args) {

        //记录开始时间
        long startTime = System.currentTimeMillis();

        Thread countUserThread = new Thread(() -> {
            try {
                System.out.println("正在统计新增用户数量");

                //任务执行需要3秒
                Thread.sleep(3000);

                //保存结果值
                map.put("userNumber", 1);

                //标记已经完成一个任务
                countDownLatch.countDown();
                System.out.println("统计新增用户数量完毕");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        });
        Thread countOrderThread = new Thread(() -> {
            try {
                System.out.println("正在统计订单数量");

                //任务执行需要3秒
                Thread.sleep(3000);

                //保存结果值
                map.put("countOrder", 2);

                //标记已经完成一个任务
                countDownLatch.countDown();
                System.out.println("统计订单数量完毕");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        });

        Thread countGoodsThread = new Thread(() -> {
            try {
                System.out.println("正在商品销量");

                //任务执行需要3秒
                Thread.sleep(3000);

                //保存结果值
                map.put("countGoods", 3);

                //标记已经完成一个任务
                countDownLatch.countDown();
                System.out.println("统计商品销量完毕");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        Thread countmoneyThread = new Thread(() -> {
            try {
                System.out.println("正在总销售额");

                //任务执行需要3秒
                Thread.sleep(3000);

                //保存结果值
                map.put("countmoney", 4);

                //标记已经完成一个任务
                countDownLatch.countDown();
                System.out.println("统计销售额完毕");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        //启动子线程执行任务
        countUserThread.start();
        countGoodsThread.start();
        countOrderThread.start();
        countmoneyThread.start();

        try {
            //主线程等待所有统计指标执行完毕
            countDownLatch.await();

            //记录结束时间
            long endTime = System.currentTimeMillis();
            System.out.println("------统计指标全部完成--------");
            System.out.println("统计结果为:" + map.toString());
            System.out.println("任务总执行时间为" + (endTime - startTime) / 1000 + "秒");

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

【结果】

正在统计新增用户数量
正在统计订单数量
正在总销售额
正在商品销量
统计新增用户数量完毕
统计订单数量完毕
统计商品销量完毕
------统计指标全部完成--------
统计销售额完毕
统计结果为:{countmoney=4, userNumber=1, countGoods=3}
任务总执行时间为3秒

Process finished with exit code 0
3.2 模拟高并发场景
3.2.1 场景说明

创建开启n个线程,等线程都创建准备好后,再同时执行业务逻辑,达到模拟并发场景的目标。

3.2.2 模拟代码

【代码】

public class CountDownLatchTest2 {
    public static void main(String[] args) {
        //模拟线程数
        int times = 1000;

        //用于模拟线程安全
        final AtomicInteger atomicInteger = new AtomicInteger(0);

        //用于模拟线程非安全
        final Member member = new Member();

        //相当于计数器,当所有线程都准备好了,再一起执行,模仿多并发,保证并发量
        final CountDownLatch countDownLatch = new CountDownLatch(times);

        //保证所有线程执行完了再打印atomicInteger、用户年龄的值
        final CountDownLatch countDownLatch2 = new CountDownLatch(times);

        ExecutorService executorService = Executors.newFixedThreadPool(10);

        try {
            for (int i = 0; i < times; i++) {
                executorService.submit(() -> {
                    try {
                        //一直阻塞当前线程,直到计时器的值为0,保证同时并发
                        countDownLatch.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    System.out.println("当前线程执行时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date()));

                    //数量自增1:线程安全(执行n次,每次都是1000)
                    atomicInteger.incrementAndGet();

                    //用户年龄自增1:线程不安全(执行n次,每次结果可能不一样)
                    member.setAge(member.getAge() + 1);

                    //当前线程执行完,计数器减一
                    countDownLatch2.countDown();
                });
                countDownLatch.countDown();
            }

            //保证所有线程执行完
            countDownLatch2.await();

            //打印结果
            System.out.println("atomicInteger = " + atomicInteger);
            System.out.println("age = " + member.getAge());

            executorService.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
public class Member {

    private int age;

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }
}

【结果】

--备注:前面的时间省略
当前线程执行时间:2022-03-18 13:56:29:011
当前线程执行时间:2022-03-18 13:56:29:010
当前线程执行时间:2022-03-18 13:56:29:011
当前线程执行时间:2022-03-18 13:56:29:010
当前线程执行时间:2022-03-18 13:56:29:010
当前线程执行时间:2022-03-18 13:56:29:010
当前线程执行时间:2022-03-18 13:56:29:011
当前线程执行时间:2022-03-18 13:56:29:011
当前线程执行时间:2022-03-18 13:56:29:011
当前线程执行时间:2022-03-18 13:56:29:011
当前线程执行时间:2022-03-18 13:56:29:011
atomicInteger = 1000
age = 995

Process finished with exit code 0

四、实现原理

4.1 创建计数器

当我们调用CountDownLatch countDownLatch=new CountDownLatch(4) 时候,此时会创建一个AQS的同步队列,并把创建CountDownLatch 传进来的计数器赋值给AQS队列的 state,所以state的值也代表CountDownLatch所剩余的计数次数。

public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);//创建同步队列,并设置初始计数器值
 }
4.2 阻塞线程

当我们调用countDownLatch.wait()的时候,会创建一个节点,加入到AQS阻塞队列,并同时把当前线程挂起。

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
}

判断计数器是技术完毕,未完毕则把当前线程加入阻塞队列

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        //锁重入次数大于0 则新建节点加入阻塞队列,挂起当前线程
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
 }

构建阻塞队列的双向链表,挂起当前线程

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //新建节点加入阻塞队列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                //获得当前节点pre节点
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);//返回锁的state
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //重组双向链表,清空无效节点,挂起当前线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
4.3 计数器递减

当我们调用countDownLatch.down()方法的时候,会对计数器进行减1操作,AQS内部是通过释放锁的方式,对state进行减1操作,当state=0的时候证明计数器已经递减完毕,此时会将AQS阻塞队列里的节点线程全部唤醒。

public void countDown() {
        //递减锁重入次数,当state=0时唤醒所有阻塞线程
        sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
        //递减锁的重入次数
        if (tryReleaseShared(arg)) {
            doReleaseShared();//唤醒队列所有阻塞的节点
            return true;
        }
        return false;
    }
 private void doReleaseShared() {
        //唤醒所有阻塞队列里面的线程
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {//节点是否在等待唤醒状态
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//修改状态为初始
                        continue;
                    unparkSuccessor(h);//成功则唤醒线程
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
 }

资料来源:CountDownLatch的使用和原理解析

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,919评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,567评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,316评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,294评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,318评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,245评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,120评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,964评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,376评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,592评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,764评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,460评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,070评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,697评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,846评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,819评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,665评论 2 354

推荐阅读更多精彩内容