源码地址:https://github.com/handsomemao/concurrency.git
CountDownLatch
闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态。CountDownLatch 是一种灵活的闭锁实现,它可以使一个或者多个线程等待一组事件的发生。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown 方法递减计数器,表示有一个事件已经发生,而 await 方法等待计数器达到零,这表示需要等待的事情都已经发生。如果计数器的值非零,那么 await 会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。
CountDownLatch只能执行一次,计数值不能被重置,这个是CountDownLatch和CyclicBarrier的区别。
@Slf4j
public class CountDownLatchExample2 {
private final static int threadCount = 200;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (Exception e) {
log.error("exception", e);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await(10, TimeUnit.MILLISECONDS);
log.info("finish");
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
Thread.sleep(100);
log.info("{}", threadNum);
}
}
Semphore
计数信号量(Counting Semaphore)用来控制同时访问某个特定资源的访问数量,或者同时执行某个指定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界。
Semaphore 中管理着一组虚拟的许可(permit),许可的初始容量可通过构造函数来指定。在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用以后释放许可。如果没有许可,那么 acquire 将阻塞直到有许可(或者直到被中断或者操作超时)。release 方法将返回一个许可给信号量。二值信号量(即初始值为 1 的 Semaphore)可以用做互斥体(mutex),并具备不可重入的加锁语义:谁拥有这个唯一的许可,谁就拥有了互斥锁。因此,它可以用来 实现生产者-消费者模式。
public class SemaphoreExample4 {
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 尝试获取一个许可
test(threadNum);
semaphore.release(); // 释放一个许可
}
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
CyclicBarrier
我们已经看到通过闭锁来启动一组相关的操作,或者等待一组相关的操作结束。闭锁是一次性对象,一旦进入终止状态,就不能被重置。
栅栏(Barrier)类似于闭锁,它能阻塞一组线程直到某个事件发生。栅栏和闭锁的关键区别在于,所有线程必须同时达到栅栏位置,才能继续执行。CyclicBarrier 可以使一定数量的参与方反复地在栅栏位置汇集,它在并行迭代算法中非常有用,有兴趣的可以看下 CyclicBarrier example: a parallel sort algorithm。
当现成达到栅栏位置时将调用 await 方法这个方法将阻塞直到所有线程都达到栅栏位置。如果所有线程都达到了栅栏位置,那么栅栏将打开,此时所有线程都被释放,而栅栏将被重置一遍下次使用。如果对 await 的调用超时,或者 await 阻塞的线程被中断,那么栅栏就被认为是被打破了,所有阻塞的 await 调用都将终止并抛出 BrokenBarrierException。如果成功通过栅栏,那么 await 将为每个线程返回一个唯一的到达索引号。CyclicBarrier 还可以使你将一个栅栏操作传递给构造函数,这是一个 Runnable,当成功通过栅栏时会(在一个子任务线程中)执行它,但在阻塞线程被释放之前是不能执行的。
public class CyclicBarrierExample3 {
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
log.info("callback is running");
});
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
}