在java的并发包中为我们提供了三种并发控制的手段,他们分别是CountDownLatch|CyclicBarrier|Semaphore,今天我们分别来讲解一下这三种方法以及使用场景
CountDownLatch
- 特点:
- 首先设置一个计数器,在线程完成数达到计数器阈值之前,其他的一个或多个线程都会等待。
- 计数器无法重置,一旦到达阈值,就会释放所有线程
- 一个线程等待其他线程,全部完成后才返回
- 适用场景:
- 实现并行(可以用来模拟高并发的操作)
- 开始执行任务前先完成前置的一些任务
- 检测死锁
- 使用:
- 当每次线程完成时在执行countDown()使计数器减1,当执行线程数达到计数器阈值时即会自动释放所有等待的线程
- demo:
public class CountDownLatchDemo {
// 线程数量
private static int THREAD_COUNT = 10;
// 初始化,设置计数器大小
private static CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
// 创建线程池,大小与计数器一致,不一致则无法实现一次并发所有请求
private static ExecutorService ex = Executors.newFixedThreadPool(THREAD_COUNT);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
Runnable runnable = () -> {
try {
Thread.sleep(1000);
System.out.println("Thread: " + Thread.currentThread().getName());
} catch (InterruptedException e) {
} finally {
// 一个线程执行完毕,计数器减1
countDownLatch.countDown();
}
};
// 线程池执行线程
ex.execute(runnable);
}
// 关闭线程池
ex.shutdown();
}
}
after 1s output:
Thread: pool-1-thread-8
Thread: pool-1-thread-3
Thread: pool-1-thread-7
Thread: pool-1-thread-10
Thread: pool-1-thread-5
Thread: pool-1-thread-2
Thread: pool-1-thread-6
Thread: pool-1-thread-4
Thread: pool-1-thread-9
Thread: pool-1-thread-1
CyclicBarrier
- 特点:
- 设置屏障,当一组线程到达屏障时被阻塞,待所有线程到达时,取消所有阻塞。
- 适用场景:
- 需要所有子任务完成时才执行主任务(类似CountDownLatch)
- 多线程计算,最后合并计算结果
- 多个线程互相等待,全部完成后才返回
- 使用:
- 每次执行完线程,调用await()进入屏障中等待
- 当所有线程都到达屏障时,自动取消所有阻塞
- demo:
设置多名学生,求多个学生的平均成绩,如果使用多线程计算每个学生的成绩,则需要等待所有学生的成绩计算完毕,才能求平均值,所以可以使用CyclicBarrier
public class CyclicBarrierDemo implements Runnable {
// 设置线程池数量
private final int THREAD_COUNT = 5;
// 创建屏障数量,屏障全部到达后执行当前类的run方法
private CyclicBarrier cyclicBarrier = new CyclicBarrier(5, this);
// 创建线程池
private Executor executor = Executors.newFixedThreadPool(THREAD_COUNT);
// 保存每个人的成绩
private ConcurrentHashMap<String, Integer> scores = new ConcurrentHashMap<>();
public static void main(String[] args) {
CyclicBarrierDemo c = new CyclicBarrierDemo();
c.count();
}
private void count() {
Random random = new Random();
for (int i = 0; i < 5; i++) {
Runnable runnable = () -> {
// 每个学生的随机成绩
int temp = random.nextInt(100);
scores.put(Thread.currentThread().getName(), temp);
System.out.println(temp);
try {
// 进入屏障,等待其他学生计算完毕
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
};
executor.execute(runnable);
}
}
@Override
public void run() {
int result = 0;
// 求平均值
for (Map.Entry<String, Integer> out : scores.entrySet()) {
result += out.getValue();
}
System.out.println("avg: " + result / THREAD_COUNT);
}
}
output:
4
11
76
98
37
avg: 45
Semaphore
- 特点:
- 设置同一时间被访问的线程数量,可以实现流量控制
- 适用场景:
- 限流,限制每次运行的线程数量
- 使用:
- 线程开始时调用acquire()使用一个信号量
- 线程结束时调用semaphore.release()释放一个信号量
- 当acquire次数 - release次数 > THREAD_COUNT时,则无法再开始新的线程
- demo:
public class SemaphoreDemo {
// 线程数量
private static int THREAD_COUNT = 10;
// 设置线程池大小
private static final ExecutorService exec = Executors.newFixedThreadPool(THREAD_COUNT);
// 设置信号量的数量
private static final Semaphore semaphore = new Semaphore(THREAD_COUNT);
public static void main(String[] args) {
for (int i = 0; i < 50; i++) {
final int No = i;
Runnable runnable = () -> {
try {
semaphore.acquire();
System.out.println("Get: " + No);
Thread.sleep(1000);
} catch (InterruptedException e) {
} finally {
semaphore.release();
}
};
exec.execute(runnable);
}
exec.shutdown();
}
}
every 5s output 5:
Get: 1
Get: 3
Get: 4
Get: 2
Get: 0
Get: 5
Get: 6
Get: 7
Get: 8
Get: 9
Get: 10
Get: 12
Get: 11
Get: 13
Get: 17
Get: 15
Get: 14
Get: 16
Get: 19
Get: 18
Get: 20
Get: 22
Get: 21
Get: 23
Get: 24
Get: 26
Get: 25
Get: 29
Get: 28
Get: 27
Get: 30
Get: 33
Get: 36
Get: 38
Get: 39
Get: 32
Get: 31
Get: 37
Get: 35
Get: 34
Get: 40
Get: 45
Get: 49
Get: 44
Get: 43
Get: 42
Get: 41
Get: 48
Get: 47
Get: 46
三种工具的使用方法就介绍的这里,源码分析待日后分析。。。。
每一行代码,都是浩瀚星辰的点点繁星