循环屏障,它允许多个线程相互等待到一个障碍点之后才继续执行,可指定等待到屏障之后的执行任务,CyclicBarrier支持循环使用。
功能说明
CyclicBarrier实现了当多个线程达到屏障之后才执行的功能,可指定runable表示达到屏障之后执行的任务。类似于CountDownLatch的功能。但是与CountDownLatch不同的是,CyclicBarrier可以通过reset方法重置进行重复使用,所以Cyclicbarries适合更加复杂的场景,例如可以实现执行错误之后通过reset重试。
CyclicBarrier(int parties, Runnable barrierAction)//屏障数和等待后任务
使用场景
1、 实现多线程任务执行完后的汇总。barrierAction表示汇总操作。
实现关键
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();//每次嗲用await都需要获取lock锁
try {
final Generation g = generation;
if (g.broken)//如果当前代broken
throw new BrokenBarrierException();
if (Thread.interrupted()) {//如果线程被中中断,则breakBarriers,屏障将会不可用,除非调用reset开始新代
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // 如果当前时最后一个到达屏障的线程,执行runnable,并且开启新的代,唤醒所有等待的线程
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)//如果runnable执行失败,也会breakBarrier屏障
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {//不是最后一个到达屏障的线程,通过condition进入等待,直到被触发、broken、中断、超时
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)//表示开始新的代了
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
1、调用await方法的线程表示线程到达了屏障。
2、是否全部到达屏障通过count来判断,因为涉及到多线程同步问题,所以内部使用了ReenTrantLock来同步代码。
3、如果线程被中断,那么会breakBarrier,屏障不可用了,并且唤醒所有等待的线程。
4、当count为0时,表示全部到达屏障了,将执行设置的runnable。然后开启下一代,唤醒所有等待的线程。如果执行runnable失败,则也会breakBarrier。
5、否则表示不是最后一个到达屏障的线程,会通过condition来堵塞线程。直到最后一个线程执行完之后唤醒或者超时、中断。
总结
CyclicBarrier通过一个count值来表示剩余到达屏障的线程,由于涉及到并发操作,所以通过ReenTrantLock来实现同步,当线程执行await时,如果不是最后一个到达屏障的线程,那么会通过Condition来堵塞线程,当count等于0时,表示最后一个到达屏障的线程,将会唤醒等待中的线程。CyclicBarrier支持循环利用是因为其中引入了一个generation一代的概念,当一代执行完将开启新的代(其实就是将数据重置了)