Java并发编程源码分析系列:
前两篇分别通过ReentrantLock和CountDownLatch分析了AQS的独占功能和共享功能。除CountDownLatch之外,还有Semaphore和CyclicBarrier,前者类似CountDownLatch使用AQS实现,后者使用ReentrantLock实现,本文分析一下。
信号量Semaphore
直接用个小例子描述信号量的用法:
final Semaphore semaphore = new Semaphore(2);
for (int studentIndex = 0; studentIndex < 5; studentIndex++) {
final int finalI = studentIndex;
new Thread(() -> {
try {
semaphore.acquire(); //如果没有许可,阻塞
System.out.println("student " + finalI + " read book");
Thread.sleep(new Random().nextInt(10000) + 1000);
System.out.println("student " + finalI + " finish read book");
semaphore.release(); //释放许可
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
5名学生阅读2本书,因为书的数量是固定的,所以只有2名学生可以持有书阅读,其他人需要等别人看完再看。程序执行结果如下:
student 0 read book
student 1 read book
student 1 finish read book
student 2 read book
student 0 finish read book
student 3 read book
student 2 finish read book
student 4 read book
student 3 finish read book
student 4 finish read book
信号量控制同时访问某个特定资源的操作数量,或者执行某个指定操作的数量。
信号量有许可的概念,初始许可数量在构造Semaphore时确定。许可还有剩余时,通过acquire获取许可;操作完成后,通过release释放许可。如果没有许可,acquire将阻塞直到有许可。
Semaphore使用起来非常简单,下面来看源码。
Semaphore构造函数
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
和ReentrantLock类似,Semaphore也分为公平和非公平两种。permits是许可数量,最终传递给state,此时state的含义就是许可的数量。
获取许可
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
Semaphore的acquire调用的是acquireSharedInterruptibly,和CountDownLatch的await一样,差异的地方是tryAcquireShared方法是由子类实现。
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
公平信号量和非公平信号量分别调用tryAcquireShared和nonfairTryAcquireShared,目的都是将state减一,以此尝试获取许可。不同之处是公平信号量多调用了hasQueuedPredecessors,判断是否有比自己先的线程在等待;非公平信号量就不管排队,先试了再说。
释放许可
对于release操作,还是和CountDownloadLatch一样调用releaseShared。到这里,应该对AQS的使用非常了解,直接来看tryReleaseShared。
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
release是acquire的反操作,将许可加一。
总结下来,Semaphore和CountDownLatch都是依赖AQS实现,两者只有少少不同。
栅栏CyclicBarrier
final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("all threads through barrier"));
for (int i = 0; i < 5; i++) {
final int finalI = i + 1;
new Thread(() -> {
System.out.println("thread " + finalI + " is started");
Random random = new Random();
try {
Thread.sleep(random.nextInt(10000) + 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread " + finalI + " has been completed");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("thread " + finalI + " to continue");
}).start();
}
CyclicBarrier设立一个barrier,只有当所有线程到达barrier的位置,才能继续执行下去。打个简单比喻,只有当所有人到达指定地方集中,才能开车出发。
thread 1 is started
thread 4 is started
thread 3 is started
thread 5 is started
thread 2 is started
thread 2 has been completed
thread 3 has been completed
thread 5 has been completed
thread 4 has been completed
thread 1 has been completed
all threads through barrier
CyclicBarrier的构造函数
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
- parties表示barrier开启需要到达的线程数量;
- count表示等待到达barrier的线程数量;
- barrierCommand表示开启barrier后执行的操作;
CyclicBarrier的构造函数设置了参数,没有什么特别,下面来看核心方法await。await分为限时和不限时两个版本,最终都调用了dowait。在开始研究dowait代码之前,先要了解几点。
线程什么时候通过Barrier?
当三种条件之一发生时,线程才能继续执行:
- 当parties个线程到达barrier;
- 当前线程被中断,抛出InterruptedException;
- 超时,抛出BrokenBarrierException。
显式条件Condition
dowait的实现使用了ReentrantLock和Condition。ReentrantLock前文介绍过,它是synchronized的高级版本。Object的wait、notify、notifyAll大家都知道,它和synchronized打包实现“条件不满足时,线程等待;条件满足时,等待该条件的线程被唤醒”的功能。Condition是它的高级版本,和ReentrantLock一起可以更加灵活精细地实现这种功能。Condition不是本文的重点,后续会开新篇讲解,现在先当成wait和notify理解。
Generation对象
private Generation generation = new Generation();
private static class Generation {
boolean broken = false;
}
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
Generation是一代的意思,唯一记录了barrier是否broken。看CyclicBarrier的名字也知道,它是可重复使用的,每次使用CyclicBarrier,本次所有线程同属于一代,即同一个Generation。当parties个线程到达barrier时,需要调用nextGeneration更新换代。
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
barrier被broken后,调用breakBarrier方法,将generation.broken设置为true,并使用signalAll通知所有等待的线程。
dowait
现在正式看dowait的代码,代码有点长,但有了前面的铺垫,看起来就简单多了。
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
//1
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//2
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
//3
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
dowait一开始就获取ReentrantLock并锁定,然后在标记1,判断Generation是否broken,再判断线程是否被中断。
标记2,将等待的线程数量减1,如果count正好为零,说明parties个线程到达barrier了。执行预定的Runnable任务后,更新换代,准备下一次使用。
标记3的for循环里,通过标记timed,根据是否限时,线程调用await或者awaitNanos进入等待。其他代码很好理解,就不多说了。
后记
理解AQS之后,研究Semaphore易如反掌,CyclicBarrier也没什么难的,只有一个新东西Condition,下一篇研究。