CyclicBarrier 也是 AQS 的同步组件
CyclicBarrier
CyclicBarrier
也是一个同步辅助类 , 它允许一组线程相互等待 , 直到到达某个公共的屏障点 , 通过它可以完成多个线程之间相互等待 ,只有当每个线程都准备好之后, 才能各自继续往下执行后续的操作, 和 CountDownLatch
相似的地方就是, 它也是通过计数器来实现的. 当某个线程调用了 await()
方法之后, 该线程就进入了等待状态 . 而且计数器就进行 +1
操作 , 当计数器的值达到了我们设置的初始值的时候 , 之前调用了await()
方法而进入等待状态的线程会被唤醒继续执行后续的操作. 因为 CyclicBarrier
释放线程之后可以重用, 所以又称之为循环屏障 . CyclicBarrier
使用场景和 CountDownLatch
很相似 , 可以用于多线程计算数据, 最后合并计算结果的应用场景 .
CyclicBarrier 与 CountDownLatch 区别
CountDownLatch
的计数器只能使用一次 , 而CyclicBarrier
的计数器可以使用reset
重置 循环使用-
CountDownLatch
主要事项 1 个 或者 n 个线程需要等待其它线程完成某项操作之后才能继续往下执行 , 其描述的是 1 个 或者 n 个线程与其它线程的关系 ;CyclicBarrier
主要是实现了 1 个或者多个线程之间相互等待,直到所有的线程都满足条件之后, 才执行后续的操作 , 其描述的是内部各个线程相互等待的关系 .CyclicBarrier
假如有 5 个线程都调用了await()
方法 , 那这个 5 个线程就等着 , 当这 5 个线程都准备好之后, 它们有各自往下继续执行 , 如果这 5 个线程在后续有一个计算发生错误了 , 这里可以重置计数器 , 并让这 5 个线程再执行一遍 .
CyclicBarrier 代码演示
- 演示代码1
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by Charles
*/
@Slf4j
public class CyclicBarrierExample1 {
private static CyclicBarrier barrier = new CyclicBarrier(5); // 5 个线程同步等待
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await(); // 有一个线程准备 ok 了 , 当达到上面设置的5个线程 的时候 , 后续代码就开始执行了
log.info("{} continue", threadNum);
}
}
控制台输出
- 演示代码 2 (带有等待时间)
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
/**
* Created by Charles
*/
@Slf4j
public class CyclicBarrierExample2 {
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
try {
barrier.await(2000, TimeUnit.MILLISECONDS); //等待2000 毫秒后继续执行
} catch (BrokenBarrierException | TimeoutException e) {
log.warn("BarrierException",e);
}
log.info("{} continue", threadNum);
}
}
控制台输出结果如下图 :