一个多线程协同器,它可以让一组线程相互等待,当等待的数量达到预设数量时这组线程通过等待继续工作。说得形象点,CyclicBarrier就好比汽车站滚动发车的模式,把客车看着CyclicBarrier,乘客看着是各个任务线程,当乘客到达客车时,需要等待另外的乘客,当乘客到齐后自动发车,如果等待乘客超时了,则将乘客全部赶下车(司机太凶残了),然后重新安排依次上车(是否要上车由乘客自己决定);每个上车的乘客都需要判断自己是否是这辆车的最后一个乘客,如果不是,则上车后立即开始睡觉,如果是最后一个,则他需要叫醒所有乘客。当然客车站在创建这些客车的时候可能会做一些额外的事情,例如所有乘客到齐后,司机给大伙一人发一瓶矿泉水,或者是其它的,但是前提条件就是乘客到齐。
图中的CyclicBarrier需要等待8个线程到达后才会“发车”,目前已经到达的线程有4个,还需要等待4个线程;线程上车的过程(也就是进入await的过程)是要进行排队的,这里是通过ReentrantLock来实现的,上车后的睡眠是通过锁的条件等待Condition来实现的。
首先看一下它的内部整体结构
public class CyclicBarrier {
//一个标识,标识这一次的协同是否完成(正常完成,异常完成)
private static class Generation {
boolean broken = false;
}
//线程进入条件等待时需要获取锁
private final ReentrantLock lock = new ReentrantLock();
//等待条件
private final Condition trip = lock.newCondition();
//每次需要协同的线程数(客车的准载数)
private final int parties;
//这组线程(parties)满足协同条件后需要做的一件事情
private final Runnable barrierCommand;
//标识实例,一个generation代表一次线程协同
private Generation generation = new Generation();
//还需要等待的线程数量(还未上车的乘客数)
private int count;
//最后一个乘客上车后使用的工具,唤醒所有乘客,
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation(); //换一辆车
}
//等待超时后,司机生气了,就用这个方法把大家叫醒,然后把这辆车标记为broken,把所有人赶下车
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
//车辆的构造器,客车占为车辆设置的规则
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
//同上,只是线程协同完成后不需要做额外的动作
public CyclicBarrier(int parties) {
this(parties, null);
}
/**
一些核心方法
**/
}
核心方法
CyclicBarrier的核心方法是await,该方法是线程相互等待的关键,它有两种实现,一种是带等待超时的,一种是不会等待超时:
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException,TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
从代码可以看出,其核心都是使用了dowait这个方法
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock(); //获取锁,这里可以看出线程进入等待是单线程的
try {
final Generation g = generation; //当前的实例标记
if (g.broken) //新到达的线程判断实例是否正常,如不正常则抛出异常
throw new BrokenBarrierException();
if (Thread.interrupted()) { //新到达的线程判断线程中断状态
breakBarrier(); //如果线程被中断,则标记当前实例为中断,并唤醒所有等待线程
throw new InterruptedException();
}
int index = --count; //上车成功,还需要上车的人数减1
if (index == 0) { // tripped //判断是否是最后一个乘客
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run(); //最后一个乘客上车成功,人满,则执行客车统一的规定
ranAction = true;
nextGeneration(); //唤醒本车所有乘客,并帮助唤来下一辆车。
return 0;
} finally {
if (!ranAction) //如果在执行客车统一任务的时候出了问题,则整趟车标记为broken,唤醒所有乘客并赶下车
breakBarrier();
}
}
//如果上车的不是最后一个乘客
for (;;) {
try {
if (!timed)
trip.await(); //不需要判断睡眠时间,一直睡
else if (nanos > 0L) //设置睡眠时间并睡眠
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) { //如果在失眠过程中被中断(这里不是被正常唤醒,是被中断)
if (g == generation && ! g.broken) { //如果没有换车,并且客车也没被标记为broken
breakBarrier(); //则被中断的线程(乘客)负责将该辆车标记为中断
throw ie;
} else {
Thread.currentThread().interrupt(); //如果已经换车,或者被标记为了broken,则保存中断状态,继续后面的执行
}
}
if (g.broken)
throw new BrokenBarrierException(); //被标记为了broken(这可能自己前面标的,也可能其它线程标的),则所有的线程都抛出异常。
if (g != generation) //如果是正常被唤醒,则直接返回还需上车的人(理论上应该是0)
return index;
if (timed && nanos <= 0L) { //如果是应为等待超时,则抛出TimeoutException
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
总结
从代码可以看出,CyclicBarrier的实现是利用条件等待,用到条件等待当然就会用到锁。
多个线程协调过程中,只要有一个线程被中断或者发生异常,则整个协调取消。
CyclicBarrier与CountDownLatch异同点
相同点:
1.都能让多个线程协调,在某一个点上等待
不同点:
1.CyclicBarrier是多个线程自行协同,当线程到达等待数量时自动放行,而CountDownLatch是多个线程阻塞后,需要外界条件达到某种状态的时候才会被统一唤醒,即CyclicBarrier只需要各个线程await,而CountDownLatch还需要额外是countDown。
2.实现上,CyclicBarrier是使用独占锁+Condition实现的,而CountDownLatch是自己实现AQS,利用共享锁的原理实现。
3.CountDownLatch一旦满足条件后需要重新初始化才能再使用,而CyclicBarrier可以循环使用。