谈谈CountDownLatch和CyclicBarrier

Java中CountDownLatch和CyclicBarrier都是用来做多线程同步的。下面分析一下他们功能的异同。

CountDownLatch

CountDownLatch基于AQS(同步器AbstractQueuedSynchronizer浅析),CountDownLatch中有一个内部类Sync,Sync继承自AbstractQueuedSynchronizer。
我们先看一个CountDownLatch的例子,然后再具体分析源码。

一个CountDownLatch例子

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchExample {

    private ExecutorService executorService;
    private CountDownLatch countDownLatch;
    private int parties;

    public static void main(String[] args){
        CountDownLatchExample countDownLatchExample = new CountDownLatchExample(10);
        try {
            countDownLatchExample.example();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public CountDownLatchExample(int parties) {
        executorService = Executors.newFixedThreadPool(parties);
        countDownLatch = new CountDownLatch(parties);
        this.parties = parties;
    }

    public void example() throws InterruptedException {
        for (int i = 0; i < parties; i++) {
            executorService.submit(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " gets job done");
                countDownLatch.countDown(); // 线程完成任务之后countDown
            });
        }
        // 等待所有的任务完成
        countDownLatch.await();
        System.out.println(Thread.currentThread().getName() + " reach barrier");
        executorService.shutdown();
    }
}

上面是一个CountDownLatch的例子,CountDownLatch的API还是很简单的,主要就是countDown和await两个方法。CountDownLatch实例化时将count设置为AQS的state,每次countDown时CAS将state设置为state - 1,await时首先会检查当前state是否为0,如果为0则代表所有的任务完成了,await结束,否则主线程将循环重试,直到线程被中断或者任务完成或者等待超时。下面具体看看CountDownLatch的源码。

CountDownLatch源码分析

首先看一下CountDownLatch的构造函数和Sync的代码:

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count); // 设置Sync(AQS)的state为count
}
    
private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count); // 设置AQS的state为count
    }

    int getCount() {
        return getState();
    }

    // 
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1; // 重写了AQS的方法
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) { // 循环CAS重试
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

CountDownLatch构造参数count代表当前参与同步的线程数目,然后设置当前AQS的状态为count。Sync覆写了tryAcquireShared和tryReleaseShared方法。tryAcquireShared方法中会判断当前AQS的state是否为0,如果是0才能获取成功(返回1),否则,获取失败(返回-1)。tryReleaseShared通过for循环进行CAS设置状态。
CountDownLatch中最主要的两个方法是:countDown和await:

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void countDown() {
    sync.releaseShared(1);
}

每次一个任务完成之后,调用CountDownLatch的countDown方法,将当前AQS的state减1(AQS初始state为count)。countDown的逻辑其实比较简单,就是通过重试CAS设置当前AQS的state为state - 1。这里着重看一下await方法,await方法体中调用AQS的acquireSharedInterruptibly或者tryAcquireSharedNanos方法。看一下AQS的acquireSharedInterruptibly方法:

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0) // tryAcquireShared这里已经在Sync中重写了
        doAcquireSharedInterruptibly(arg);
}

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) { // 循环重试
            final Node p = node.predecessor(); // 对于CountDownLatch来说,等待队列中其实只有一个main线程在等待,因此这里第一次就应该判断条件`p == head`成立
            if (p == head) {
                int r = tryAcquireShared(arg); // 方法已经在CountDownLatch$Sync中重写了
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 如果当前失败,则挂起线程,循环重试
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

前面已经分析过AQS了,这里对AQS部分不多做解释了。主要是这里的tryAcquireShared方法,CountDownLatch的Sync内部类中重写了该方法。如果当前state等于0才能获取成功,因此只有当所有任务完成,此时AQS的state为0,await方法才会返回(不考虑中断和超时)。

CyclicBarrier

CyclicBarrier较CountDownLatch而言主要多了两个功能:

  1. 支持重置状态,达到循环利用的目的。这也是Cyclic的由来。CyclicBarrier中有一个内部类Generation,代表当前的同步处于哪一个阶段。当最后一个任务完成,执行任务的线程会通过nextGeneration方法来重置Generation。也可以通过CyclicBarrier的reset方法来重置Generation。
  2. 支持barrierCommand,当最后一个任务运行完成,执行任务的线程会检查CyclicBarrier的barrierCommand是否为null,如果不为null,则运行该任务。

一个CyclicBarrier例子

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CyclicBarrierExample {

    private ExecutorService executorService;
    private CyclicBarrier cyclicBarrier;
    private int parties;

    public CyclicBarrierExample(int parties) {
        executorService = Executors.newFixedThreadPool(parties);
        cyclicBarrier = new CyclicBarrier(parties, () -> System.out.println(Thread.currentThread().getName() + " gets barrierCommand done"));
        this.parties = parties;
    }

    public static void main(String[] args) {
        CyclicBarrierExample cyclicBarrierExample = new CyclicBarrierExample(10);
        cyclicBarrierExample.example();
    }

    public void example() {
        for (int i = 0; i < parties; i++) {
            executorService.submit(() -> {
                try {
                    Thread.sleep(1000);
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " gets job done");
            });
        }
        executorService.shutdown();
    }
}

可以看到这里CyclicBarrier主要的API就是await方法,每个任务最后调用这个方法等待最后一个任务完成,在这之前所有的线程都会等待。

CyclicBarrier源码分析

CyclicBarrier主要有以下属性:

/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock(); // 锁
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition(); // 锁关联的Condition,用于线程同步
/** The number of parties */
private final int parties; // 多少个任务参与同步
/* The command to run when tripped */
private final Runnable barrierCommand; // 最后一个任务运行线程应该执行的command
/** The current generation */
private Generation generation = new Generation(); // 当前CyclicBarrier所处的Generation

/**
 * Number of parties still waiting. Counts down from parties to 0
 * on each generation.  It is reset to parties on each new
 * generation or when broken.
 */
private int count; // 剩余的等待的任务数目,取值范围:[0-parties]

CyclicBarrier主要的API就是await方法和reset方法。

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L); // 等待所有任务完成
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        // 开启下一个Generation,达到循环使用的目的
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

在每一个任务结束时我们调用CyclicBarrier的await方法,所有的线程都会等待最后一个任务完成才会退出。注意CountDownLatch中任务执行线程调用完了countDown方法之后就会退出,不会等待最后一个任务完成才会退出,这也是CountDownLatch和CyclicBarrier的一个区别。
我们主要看一下dowait方法:

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock; // 加锁
    lock.lock();
    try {
        final Generation g = generation; // 当前CyclicBarrier所处的Generation

        if (g.broken) // 检查Generation的broken标志位
            throw new BrokenBarrierException(); // 被中断

        if (Thread.interrupted()) { // 检查当前线程是否被中断,响应中断
            breakBarrier();
            throw new InterruptedException();
        }

        int index = --count; // 剩余等待任务数
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null) // 如果当前任务是最后一个任务,则执行任务线程运行barrierCommand
                    command.run();
                ranAction = true;
                nextGeneration(); // 重置Generation
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                // 阻塞当前线程,等待最后一个任务完成,然后线程会通过nextGeneration方法调用trip.signallAll唤醒等待线程。注意当线程进入WAITING(调用await)状态或者TIMED_WAITING(awaitNanos)状态后会让出锁。
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken) // 响应中断
                throw new BrokenBarrierException();

            if (g != generation) // 已经是下一个generation,说明最后一个任务已经执行完成,返回
                return index;

            if (timed && nanos <= 0L) { // 检查是否超时
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}
  1. 首先会检查当前Generation的broken标志位,如果为true,则抛出异常。再响应中断请求。
  2. 检查当前是否为最后一个任务,如果是则检查barrierCommand是否为null,如果不为null则执行任务。如果不是最后一个任务则到步骤3。
  3. 判断是否需要阻塞当前线程,如果需要则通过trip的await或者awaitNanos方法使其进入休眠,注意休眠后线程会让出锁。如果休眠超时则抛出TimeoutException异常。休眠完成后(要么休眠超时要么被最有一个任务执行线程唤醒)会响应中断请求,再判断当前generation是否改变(最后一个执行任务线程会通过nextGeneration方法改变generation),如果改变直接返回。

其流程图如下:

CyclicBarrierDoAwait

总结

  1. CountDownLatch和CyclicBarrier都是用作多线程同步,CountDownLatch基于AQS,CyclicBarrier基于ReentrantLock。
  2. CyclicBarrier支持复用和barrierCommand,但是CountDownLatch不支持。
  3. CyclicBarrier会阻塞线程,在最后一个任务执行线程完成之前,其余线程都必须等待,而线程在调用CountDownLatch的countDown方法之后就会结束。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,133评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,682评论 3 390
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,784评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,508评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,603评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,607评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,604评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,359评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,805评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,121评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,280评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,959评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,588评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,206评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,193评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,144评论 2 352

推荐阅读更多精彩内容