CyclicBarrier是JDK8提供的一个多线程同步工具,用于实现一组线程在某个点上等待,直到所有线程都到达该点后再一起继续执行。它基于"栅栏"(Barrier)的概念,通过一个计数器来实现线程的同步。在这篇文章中,我们将介绍CyclicBarrier的实现原理、基础用法,并提供代码示例。
CyclicBarrier的实现原理
CyclicBarrier内部使用了ReentrantLock和Condition来实现线程的等待和唤醒机制。其基本原理如下:
- CyclicBarrier通过构造函数指定参与同步的线程数量,称为parties。当所有parties数量的线程都调用了await()方法后,CyclicBarrier会将计数器重置为初始值,并唤醒所有在等待的线程。
- 每个线程在调用await()方法时,会将自己加入等待队列,并释放持有的锁。
- 当最后一个线程调用了await()方法后,会唤醒所有在等待队列中的线程,使得它们可以继续执行。
- CyclicBarrier还可以设置一个可选的回调函数(Runnable),在所有线程都到达栅栏点后执行。
CyclicBarrier的基础用法
CyclicBarrier的基础用法包括以下几个步骤:
1.创建CyclicBarrier实例,并指定参与同步的线程数量。例如:
1CyclicBarrier barrier = new CyclicBarrier(3);
这里创建了一个CyclicBarrier实例,指定了参与同步的线程数量为3。
2.在每个线程中调用await()方法,使线程等待。例如:
1try {
2 System.out.println("Thread " + Thread.currentThread().getId() + " is waiting at the barrier.");
3 barrier.await(); // 等待所有线程到达栅栏点
4 System.out.println("Thread " + Thread.currentThread().getId() + " has passed the barrier.");
5} catch (InterruptedException | BrokenBarrierException e) {
6 e.printStackTrace();
7}
这里通过调用await()方法使线程等待,直到所有参与同步的线程都到达栅栏点。当所有线程都调用了await()方法后,会唤醒它们,使得它们可以继续执行。
3.可选的回调函数。在创建CyclicBarrier实例时,可以传入一个可选的回调函数(Runnable),在所有线程都到达栅栏点后执行。例如:
1CyclicBarrier barrier = new CyclicBarrier(3, () -> {
2 System.out.println("All threads have reached the barrier.");
3 // 可选的回调函数,会在所有线程到达栅栏点后执行
4});
这里定义了一个回调函数,当所有线程都到达栅栏点后会执行。
4.重置栅栏:
1// 可以使用reset()方法来重置栅栏,将计数器重新设置为初始值
2barrier.reset();
这样可以使得之前等待的线程重新进入等待状态,可以用于多次复用栅栏。
CyclicBarrier的代码示例
下面是一个简单的CyclicBarrier的代码示例,演示了三个线程在栅栏点处等待,并在所有线程都到达后执行回调函数:
1import java.util.concurrent.BrokenBarrierException;
2import java.util.concurrent.CyclicBarrier;
3
4public class CyclicBarrierExample {
5
6 public static void main(String[] args) {
7 // 创建CyclicBarrier实例,指定参与同步的线程数量为3,并定义回调函数
8 CyclicBarrier barrier = new CyclicBarrier(3, () -> {
9 System.out.println("All threads have reached the barrier.");
10 });
11
12 // 创建并启动三个线程
13 Thread t1 = new Thread(new Worker(barrier));
14 Thread t2 = new Thread(new Worker(barrier));
15 Thread t3 = new Thread(new Worker(barrier));
16
17 t1.start();
18 t2.start();
19 t3.start();
20
21 try {
22 t1.join();
23 t2.join();
24 t3.join();
25 } catch (InterruptedException e) {
26 e.printStackTrace();
27 }
28 }
29
30 static class Worker implements Runnable {
31 private CyclicBarrier barrier;
32
33 public Worker(CyclicBarrier barrier) {
34 this.barrier = barrier;
35 }
36
37 @Override
38 public void run() {
39 try {
40 System.out.println("Thread " + Thread.currentThread().getId() + " is working.");
41 Thread.sleep(2000); // 模拟工作耗时
42 System.out.println("Thread " + Thread.currentThread().getId() + " is waiting at the barrier.");
43 barrier.await(); // 等待所有线程到达栅栏点
44 System.out.println("Thread " + Thread.currentThread().getId() + " has passed the barrier.");
45 } catch (InterruptedException | BrokenBarrierException e) {
46 e.printStackTrace();
47 }
48 }
49 }
50}
在上面的示例中,我们创建了一个包含3个线程的CyclicBarrier实例,并在每个线程中调用了await()方法使其等待。当所有线程都到达栅栏点后,会执行定义的回调函数并输出相应的信息。
总结:
CyclicBarrier是一种实现多线程同步的机制,通过栅栏点来等待多个线程到达,并在所有线程都到达后执行回调函数。它的使用可以帮助我们实现一些需要多线程协作的场景,如分布式计算、多线程数据处理等。在使用CyclicBarrier时,需要注意线程数量的设置、await()方法的调用和可选的回调函数的使用。希望这篇文章能够对你理解CyclicBarrier的实现原理和基础用法有所帮助。