CountDownLatch or CyclicBarrier

简介

JDK中提供了一些用于线程之间协同等待的工具类,CountDownLatch和CyclicBarrier就是最典型的两个线程同步辅助类。
<b>CountDownLatch :</b>一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待
<b>CyclicBarrier :</b>一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。

二者功能实现的区别:

1.CountDownLatch是一次性的,CyclicBarrier可以重设置。
2.CountDownLatch强调一个线程等多个线程完成某件事情。CyclicBarrier是多个线程互等,等大家都完成。
3.CyclicBarrier有getNumberWaiting接口返回被阻塞的线程数

二者功能介绍:

<h4>使用CountDownLatch实现:</h4>
1、5个运动员相继都准备就绪
2、教练员响起发令枪
3、运动员起跑
流程图:
<pre>

Paste_Image.png

</pre>
demo code:
<pre>
public class TestCountDownLatch {
private static final int RUNNER_NUMBER = 5; // 运动员个数
private static final Random RANDOM = new Random();
public static void main(String[] args) {
// 用于判断发令之前运动员是否已经完全进入准备状态,需要等待5个运动员,所以参数为5
CountDownLatch readyLatch = new CountDownLatch(RUNNER_NUMBER);
// 用于判断裁判是否已经发令,只需要等待一个裁判,所以参数为1
CountDownLatch startLatch = new CountDownLatch(1);
for (int i = 0; i < RUNNER_NUMBER; i++) {
Thread t = new Thread(new Runner((i + 1) + "号运动员", readyLatch, startLatch));
t.start();
}
try {
readyLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("裁判:所有运动员准备完毕,开始...");
startLatch.countDown();
}
static class Runner implements Runnable {
private CountDownLatch readyLatch;
private CountDownLatch startLatch;
private String name;
public Runner(String name, CountDownLatch readyLatch, CountDownLatch startLatch) {
this.name = name;
this.readyLatch = readyLatch;
this.startLatch = startLatch;
}
public void run() {
int readyTime = RANDOM.nextInt(1000);
try {
Thread.sleep(readyTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + ":我已经准备完毕.");
readyLatch.countDown();
try {
startLatch.await(); // 等待裁判发开始命令
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + ":开跑...");
}
}
}
---------------------------输出----------------------------------------------------------
3号运动员:我已经准备完毕.
1号运动员:我已经准备完毕.
4号运动员:我已经准备完毕.
5号运动员:我已经准备完毕.
2号运动员:我已经准备完毕.
裁判:所有运动员准备完毕,开始...
3号运动员:开跑...
4号运动员:开跑...
1号运动员:开跑...
2号运动员:开跑...
5号运动员:开跑...
</pre>
<h4>使用CyclicBarrier模拟实现:</h4>
1、前端调用restful api,已知商品id以后,调用后端商品起价接口、商品图片信息接口
2、调用后端商品起价接口、商品图片信息接口
3、在汇总模块中将多个接口的值拼接组合返回给前端
流程图:
<pre>
Paste_Image.png

</pre>
demo code:
<pre>
public class TestCyclicBarrier {
private static final int THREAD_NUMBER = 2;
private static final Random RANDOM = new Random();
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(THREAD_NUMBER, new Runnable() {
public void run() {
System.out.println("2个接口都调用完成进行后续处理。。。");
}
});
for (int i = 0; i < THREAD_NUMBER; i++) {
Thread t = new Thread(new Worker(barrier,i));
t.start();
}
}
static class Worker implements Runnable {
private CyclicBarrier barrier;
private int apiIndex;
public Worker(CyclicBarrier barrier,int apiIndex) {
this.barrier = barrier;
this.apiIndex = apiIndex;
}
public void run() {
int time = RANDOM.nextInt(1000);
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("接口" + apiIndex + "调用完成");
try {
barrier.await(); // 等待所有线程都调用过此函数才能进行后续动作
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
---------------------------输出----------------------------------------------------------
接口0调用完成
接口1调用完成
2个接口都调用完成进行后续处理。。。
</pre>
<h4>CountDownLatch源码:</h4>
CountDownLatch.countDown:
<pre>
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //尝试将初始化的state减运算
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed,只执行这里,中间一大段都会被跳过
break;
}
}
</pre>
CountDownLatch.await:
<pre>
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) //state状态不为0.既
doAcquireSharedInterruptibly(arg);
}
Node节点的waitStatus取值:
static final int CANCELLED = 1; //节点因为超时或者中断被取消。该状态不会再发生变,而且被取消节点对应的线程不会再发生阻塞。
static final int SIGNAL = -1; //后继节点将被或者已经被阻塞,所以当前节点在释放或者取消时,需要unpark它的后继节点。
static final int CONDITION = -2; //该状态仅供在条件队列中的节点使用。当该节点转移到同步队列中时,该状态将被设置为0。
static final int PROPAGATE = -3; //仅在共享模式下使用。在doReleaseShared()方法中,仅仅会设置头节点的状态为PROPAGATE。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); //共享的形式
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor(); //前一个节点
if (p == head) {
int r = tryAcquireShared(arg); //判断state值
if (r >= 0) {
setHeadAndPropagate(node, r); //设置头结点为node,且最终设置头结点的waitStatus为Node.PROPAGATE且唤醒node后面的状态<0的某个结点(如果有的话)
p.next = null; // help GC
failed = false;
return; //结束阻塞
}
}
if (shouldParkAfterFailedAcquire(p, node) && //shouldParkAfterFailedAcquire 判断前一个结点的状态来确定结点是否应该被阻塞
parkAndCheckInterrupt()) //阻塞且判断是否被中断,如果被中断则抛异常中断
throw new InterruptedException();
}
} finally {
if (failed) //异常的话acquire失败,在阻塞队列中取消node,如果node为头结点的话,且唤醒node后面的状态<0的某个结点(如果有的话)
cancelAcquire(node);
}
}
</pre>
<h4>CyclicBarrier源码:</h4>
CyclicBarrier:因为这个类代码量比较少全局分析一下:
<pre>
public class CyclicBarrier {
private static class Generation {
boolean broken = false; //当前代被中止
}
private final ReentrantLock lock = new ReentrantLock(); //lock
private final Condition trip = lock.newCondition();
private final int parties; //屏障需要拦截的任务数
private final Runnable barrierCommand; //线程都执行结束到这个屏障以后执行执行的任务
private Generation generation = new Generation(); //当前代,主要是为了reset的时候预留
private int count; //用于统计剩余任务数,初始化的时候=parties。为0的时候结束释放锁,lock.unlock();
private void nextGeneration() {
// signal completion of last generation
trip.signalAll(); //通知所有被阻塞的线程
// set up next generation
count = parties;
generation = new Generation();
}
private void breakBarrier() {
generation.broken = true; //中断
count = parties; //count被重置
trip.signalAll();
}
private int dowait(boolean timed, long nanos) //可以定义是否带超时的等待
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken) //屏障被中断
throw new BrokenBarrierException();
if (Thread.interrupted()) { //当前线程被中断
breakBarrier(); //唤醒所有
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run(); //如果barrierCommand不为空的话执行任务,不是start重启一个线程
ranAction = true;
nextGeneration(); //notice all 且更新换代
return 0;
} finally {
if (!ranAction)
breakBarrier(); //如果执行barrierCommand失败的话,唤醒所有wait的线程
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;)
try {
if (!timed) //如果没有设置wait时间话
trip.await(); //阻塞
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos); //设置等待时间
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) { //未中断
breakBarrier();//则重新中断
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
public CyclicBarrier(int parties, Runnable barrierAction) { //初始化
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) { //只有线程数,没有task,表示到达barrier后什么也不做
this(parties, null);
}
public int getParties() {
return parties;
}
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen;
}
}
public int await(long timeout, TimeUnit unit) //设置阻塞的最大时间
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
public boolean isBroken() { //返回屏障的阻塞状态,感觉用ReentrantReadWriteLock 的readLock效率更高
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
public void reset() { //重置
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;//返回被阻塞的线程数
} finally {
lock.unlock();
}
}
}
</pre>
<b>思考:</b>
1、CountDownLatch和CyclicBarrier都是通过设置的阻塞任务数减操作,而不是网上有些人说的CyclicBarrier就是通过count加知道任务数(JDK 1.7)
2、CountDownLatch更多是直接结合AQS来做阻塞使用的是共享锁,而CyclicBarrier是直接用ReentrantLock来实现使用的是排它锁,虽然ReentrantLock也是使用了AQS来实现,
3、CyclicBarrier的代码结构看起来更简单清晰,CountDownLatch用的很多基础的AQS的方法
4、CountDownLatch偏向于计数,CyclicBarrier可以指定栅栏结束以后的任务也方便重置当前栅栏

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,254评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,875评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,682评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,896评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,015评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,152评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,208评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,962评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,388评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,700评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,867评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,551评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,186评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,901评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,689评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,757评论 2 351

推荐阅读更多精彩内容