CountDownLatch、Semaphore和CyclicBarrier都属于JUC(java.util.concurrent)中的类,实现原理也离不开AQS。
CountDownLatch
CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完毕再执行。从命名可以解读到countdown是倒数的意思,类似于我们倒计时的概念。
CountDownLatch提供了两个方法,一个是countDown,一个是await, CountDownLatch初始化的时候需要传入一个整数,在这个整数倒数到0之前,调用了await方法的程序都必须要等待,然后通过countDown来倒数。下面是一个使用CountDownLatch的demo:
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch=new CountDownLatch(3);
new Thread(()->{
countDownLatch.countDown();
},"t1").start();
new Thread(()->{
countDownLatch.countDown();
},"t2").start();
new Thread(()->{
countDownLatch.countDown();
},"t3").start();
countDownLatch.await();
System.out.println("所有线程执行完毕");
}
await:调用内部类Sync的acquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted()) //判断线程是否中断
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) //如果等于0则返回1,否则返回-1,返回-1表示需要阻塞
doAcquireSharedInterruptibly(arg);
}
Semaphore
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
可以把它比作是控制流量的红绿灯。比如××马路要限制流量,只允许同时有一百辆车在这条路上行使,其他的都必须在路口等待,所以前一百辆车会看到绿灯,可以开进这条马路,后面的车会看到红灯,不能驶入××马路,但是如果前一百辆中有5辆车已经离开了××马路,那么后面就允许有5辆车驶入马路,这个例子里说的车就是线程,驶入马路就表示线程在执行,离开马路就表示线程执行完成,看见红灯就表示线程被阻塞,不能执行。下面是一个使用Semaphore的demo:
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService threadPool = Executors
.newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(10);
public static void main(String[] args) {
for (int i = 0; i< THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();
System.out.println("save data");
s.release();
} catch (InterruptedException e) {
}
}
});
}
threadPool.shutdown();
}
}
CyclicBarrier
CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。可以使用它来实现并发测试:
public class CyclicBarrierDemo {
public void test(){
CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i=0;i<10;i++){
executorService.execute(new MyRunable(cyclicBarrier));
}
executorService.shutdown();
while (!executorService.isTerminated()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class MyRunable implements Runnable{
private CyclicBarrier cyclicBarrier;
public MyRunable(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}