开篇
CyclicBarrier是一个同步工具类,它允许一组线程互相等待,直到到达某个公共屏障点。与CountDownLatch不同的是该barrier在释放等待线程后可以重用,所以称它为循环(Cyclic)的屏障(Barrier)。
CyclicBarrier支持一个可选的Runnable命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作很有用。
CyclicBarrier的内部实现逻辑基于ReentrantLock实现,可以理解为ReentrantLock的上层应用者,通过ReentrantLock的Condtion实现线程的休眠和唤醒。
CyclicBarrier用法demo
public class Test {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N);
for(int i=0;i<N;i++)
new Writer(barrier).start();
}
static class Writer extends Thread{
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
try {
Thread.sleep(5000); //以睡眠来模拟写入数据操作
System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}
System.out.println("所有线程写入完毕,继续处理其他任务...");
}
}
}
线程Thread-0正在写入数据...
线程Thread-3正在写入数据...
线程Thread-2正在写入数据...
线程Thread-1正在写入数据...
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
线程Thread-1写入数据完毕,等待其他线程写入完毕
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
CyclicBarrier类定义
- parties记录一共等待执行个数,count记录依然等待执行的个数。
- barrierCommand记录所有待执行的完成后由最后一个线程执行的完成的命令。
- Generation的代的概念来实现CyclicBarrier的复用。
- 构造函数负责初始化parties、count、barrierCommand的核心变量。
public class CyclicBarrier {
// 代的类定义
private static class Generation {
boolean broken = false;
}
// 内部通过ReentrantLock实现线程安全的等待
private final ReentrantLock lock = new ReentrantLock();
// 内部通过Lock的condition实现所有waiter的信号通知
private final Condition trip = lock.newCondition();
// 所有等待执行的个数
private final int parties;
// 所有等待线程都完成任务后由最后一个线程执行的命令
private final Runnable barrierCommand;
// 通过代的概念实现复用
private Generation generation = new Generation();
// 还在等待的个数
private int count;
// 核心构造函数
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
CyclicBarrier工作原理
- CyclicBarrier通过ReentrantLock来保证线程休眠和唤醒的通信。
- 在执行过程中会对等待计数进行减一操作,值不为0当前线程进入休眠等待其他线程唤醒
- 在执行过程中会对等待计数进行减一操作,值为0当前线程直接执行barrierCommand并且通过nextGeneration方法唤醒其他等待线程
- 线程的休眠和唤醒都是基于ReentrantLock来实现的
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 通过lock来保证线程安全
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
// 判断generation过期的情况
if (g.broken)
throw new BrokenBarrierException();
// 判断线程中断情况
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 递减待执行的个数计数
int index = --count;
// 所有待执行任务完成后执行barrierCommand命令
if (index == 0) { // tripped
boolean ranAction = false;
try {
// barrierCommand命令不为null的时候执行该命令
final Runnable command = barrierCommand;
if (command != null)
command.run();
// 已经执行了barrierCommand
ranAction = true;
// 重置generation用以复用并且唤醒所有等待的线程
// private void nextGeneration() {
// trip.signalAll();
// count = parties;
// generation = new Generation();
// }
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 如果count的值不为0,那么当前线程就开始进入等待
// 外层通过lock占用锁,内层通过wait()进入休眠并释放锁
for (;;) {
try {
if (!timed)
// private final Condition trip = lock.newCondition();
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
// 各种后置处理逻辑
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}