前言
CountDownLatch
允许一个或多个线程等待其他线程完成操作.
本文代码地址: 源码下载
例子
package com.sourcecode.concurrencytools;
public class CountDownLatchTest {
static CountDownLatch c = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(1);
c.countDown();
System.out.println(2);
//c.countDown();
/**
* 打开注释 会依次打印1,2,3
* 关闭注释 会依次打印1,2 Main线程会阻塞在await()方法
*/
}
}).start();
c.await();
System.out.println("3");
}
}
可以通过打开注释和关闭注释观察一下各自区别,进而可以简单理解
CountDownLatch
的作用.
实现思路
源码如下: 其实源码(总共也就一百来行)没有太多要分析的,逻辑也非常简单,主要依靠的还是
AQS
.
package com.sourcecode.concurrencytools;
import com.sourcecode.reentrantreadwritelock.AbstractQueuedSynchronizer;
import java.util.concurrent.TimeUnit;
public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
// 返回当前AQS的状态值
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
// 其实跟传入的参数acquires没有什么实质的作用
// 根据当前AQS的状态值是否为0,如果为0就获得锁,如果不为0会进入到AQS中的acquireSharedInterruptibly方法中
// 具体的操作需要了解AQS
return (getState() == 0) ? 1 : -1;
}
// 释放 逻辑非常简单
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void countDown() {
sync.releaseShared(1);
}
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}
思路如下:
1.CountDownLatch c = new CountDownLatch(n)
此时AQS
也就是sync
对象的状态值为n
.
2.c.await()
函数会使任何当前线程阻塞当sync
的状态值不为0
时,所有调用c.await()
方法的线程都会被加入到sync
的同步等待队列中并且节点类型为shared
. 如果sync
的状态值为0
时,c.await()
函数会使不会阻塞,当前线程会正常执行下面的代码.
3.c.countDown()
会使sync
的状态值减1
,如果状态值减为0
的时候,tryReleaseShared
会返回true
,此时会唤醒所有调用c.await()
方法而阻塞的线程.
针对第三点做一点补充说明:看看如何唤醒所有线程的
1.
releaseShared
会调用Sync
父类AbstractQueuedSynchronizer
的releaseShared(int arg)
方法如下:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
2. 调用
Sync
重写父类的tryReleaseShared(arg)
当状态值为0
的时候,该方法会返回true
进而会调用父类中的doReleaseShared()
方法唤醒同步队列中的一个线程.
3. 2中唤醒的线程会从AbstractQueuedSynchronizer
中的doAcquireSharedInterruptibly
中的parkAndCheckInterrupt()
中返回进而通过tryAcquireShared
去尝试获得锁,此时由于当前状态值为0
,会返回1
,表示获得锁,然后调用setHeadAndPropagate(node, r)
(其中r=1
)方法去设置头节点并且尝试去唤醒同步队列后面的线程.
4.setHeadAndPropagate(node, r)
方法在满足以下条件的情况下又会调用doReleaseShared()
从而进入到1.中一步步释放所有由于c.await()
方法而阻塞的线程.
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 记录一下旧的头节点
setHead(node); // 将当前节点设置为头节点
/**
* 如果propagate > 0 说明锁还可以被别的线程拿到
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
例子2: 关注异常退出
await()
除了上面讲的正常退出外,还有就是在阻塞过程中被别的线程中断的时候也会退出. 如下图所示,先启动一个自定义线程并调用await()
方法并且捕获异常,在主线程中断该线程.
package com.sourcecode.concurrencytools;
import java.util.concurrent.TimeUnit;
public class CountDownLatchTest3 {
static CountDownLatch c = new CountDownLatch(1);
public static void main(String[] args) throws InterruptedException {
Thread thread = new MyThread();
thread.start();
TimeUnit.SECONDS.sleep(1);
thread.interrupt();
//c.countDown();
System.out.println(Thread.currentThread() + "----->finished!");
}
static class MyThread extends Thread {
public void run() {
try {
System.out.println(Thread.currentThread() + "----->before await");
c.await();
System.out.println(Thread.currentThread() + "----->after await");
} catch (InterruptedException e) {
System.out.println(Thread.currentThread() + "----->in interrupted exception.");
}
System.out.println(Thread.currentThread() + "----->finished!");
}
}
}
结果如下: 可以看到当主线程中断线程
thread
时,线程thread
从await()
方法中返回. 至此可以看到await
方法是响应中断的.
Thread[Thread-0,5,main]----->before await
Thread[main,5,main]----->finished!
Thread[Thread-0,5,main]----->in interrupted exception.
Thread[Thread-0,5,main]----->finished!
对于另外一个
await(long timeout, TimeUnit unit)
有三种退出方式: 原理基本上差不多就不重复分析了.
1. 正常退出(当状态值为0
)
2. 中断退出(被其他线程中断)
3. 超时退出(时间超过了预设等待的时间)
参考
1. Java并发编程的艺术