CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障 (Barrier)。它要做的事情是,让一组线程到达一个屏障(也 可以叫同步点)时被阻塞,直到最后一个线程到达屏障时, 屏障才会开门,所有被屏障拦截的线程才会继续工作。 CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties), 其参数表示屏障拦截的线程数量,每个线程调用 await 方 法告诉 CyclicBarrier 当前线程已经到达了屏障,然后当前线程被阻塞。
当存在需要所有的子任务都完成时,才执行主任务,这个 时候就可以选择使用 CyclicBarrier
案例
public class DataImportThread extends Thread {
private CyclicBarrier cyclicBarrier;
private String path;
public DataImportThread(CyclicBarrier cyclicBarrier, String path) {
this.cyclicBarrier = cyclicBarrier;
this.path = path;
}
@Override
public void run() {
System.out.println("开始导入:path位置的数据");
try {
cyclicBarrier.await();//阻塞
} catch (
InterruptedException e) {
e.printStackTrace();
} catch
(
BrokenBarrierException e) {
e.printStackTrace();
}
}
}
class CycliBarrierDemo extends Thread {
@Override
public void run() {
System.out.println("开始进行数据分析");
}
public static void main(String[] args) {
CyclicBarrier cycliBarrier = new CyclicBarrier(3, new CycliBarrierDemo());
new Thread(new DataImportThread(cycliBarrier, "file 1")).start();
new Thread(new DataImportThread(cycliBarrier, "file 2")).start();
new Thread(new DataImportThread(cycliBarrier, "file 3")).start();
}
}
注意点
1)对于指定计数值 parties,若由于某种原因,没有足够的 线程调用 CyclicBarrier 的 await,则所有调用 await 的线程 都会被阻塞;
2)同样的 CyclicBarrier 也可以调用 await(timeout, unit), 设置超时时间,在设定时间内,如果没有足够线程到达, 则解除阻塞状态,继续工作;
3)通过 reset 重置计数,会使得进入 await 的线程出现 BrokenBarrierException;
4 ) 如 果 采 用 是 CyclicBarrier(int parties, Runnable barrierAction) 构造方法,执行 barrierAction 操作的是最 后一个到达的线程
实现原理
CyclicBarrier 相比 CountDownLatch 来说,要简单很多, 源码实现是基于 ReentrantLock 和 Condition 的组合使用。CyclicBarrier 可以有不止一个栅栏,因为 它的栅栏(Barrier)可以重复使用(Cyclic)
await
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
重点说一下:
private final Condition trip = lock.newCondition();
当count未减少到0的时候,使用conidition的trip.await(),当减少到0的时候,首先最后一个线程执行command.run()
,然后执行nextGeneration()唤醒等待的线程,使用的是 trip.signalAll()。
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}