1、CountDownLatch实现
CountDownLatch的实现基于AQS的共享模式,其Sync实现如下:
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
//初始的count值,state中保存此count值
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
//尝试获取共享锁,只有当state为0时,即计数值为0时才能获取到共享锁
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//尝试释放共享锁,通过CAS的方式将state即count值减一
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
CountDownLatch总体流程分三部:
锁计数值初始化:即初始化state值为锁的count值;
await过程:await时会调用tryAcquireShared()获取共享锁,若此时state值为大于0,则会将当前线程节点插入同步队列尾部,并阻塞当前线程;
countDown过程:countDown时会tryReleaseShared()无阻塞地减少锁计数值,当锁计数值减少到0时,就会唤醒同步队列中阻塞的线程节点。
2、CyclicBarrier实现
CyclicBarrier是基于ReentrantLock和Condition实现的锁工具。
实现源码:
//构造函数,parties为初始化的计数器,barrierAction为当parties计数减到0时的回调
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
//初始计数
this.parties = parties;
//本轮计数值
this.count = parties;
//计数为0时的回调
this.barrierCommand = barrierAction;
}
//等待
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
//h获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//获取当前分代
final Generation g = generation;
//此次分代已被停止?
if (g.broken)
throw new BrokenBarrierException();
//当前线程被终止?
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//分代计数值减1,若计数值减到0,则运行回调,并调用nextGeneration()
//唤醒所有在等待的线程,同时更新分代信息
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
//计数值未减到0,则需要等待
for (;;) {
try {
//无时间条件等待
if (!timed)
trip.await();
//设置了超时时间等待
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
//等待的过程中分代已经停止,则抛出异常
//原可能是某个线程在等待的过程中线程被中断
if (g.broken)
throw new BrokenBarrierException();
//分代已经更新?说明是被signal信号唤醒的
if (g != generation)
return index;
//等待超时?
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
3、Semaphore实现
Semaphore的实现是基于AQS的共享锁,有公平和非公平两种模式。
基本的Sync实现:
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
//初始的许可数量,同步锁的state保存许可的数量
Sync(int permits) {
setState(permits);
}
//获取许可数量
final int getPermits() {
return getState();
}
//非公平的方式获取共享锁
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
//可用许可数减去需求的许可数
int remaining = available - acquires;
//许可数大于0时,CAS获取许可
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
//释放许可
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
//许可数加锁释放的许可数
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//CAS更新许可,直到成功
if (compareAndSetState(current, next))
return true;
}
}
//减少许可
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
//
if (compareAndSetState(current, next))
return;
}
}
//将许可消耗完
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
公平的FairSync实现::
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
//公平方式获取许可
protected int tryAcquireShared(int acquires) {
for (;;) {
//当前节点有前驱节点,则获取失败
if (hasQueuedPredecessors())
return -1;
//无前驱节点时,CAS方式获取许可
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
非公平的Sync实现:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
//直接使用Sync的nonfairTryAcquireShared()实现,非公平方式获取许可
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
相关对外接口实现:
//信号量初始化,permits:初始许可数;fire:是否公平
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
//获取许可
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//释放许可
public void release() {
sync.releaseShared(1);
}
4、特性对比
区别:
CountDownLatch 使一个线程A或是组线程A等待其它线程执行完毕后,线程A或是组线程A才继续执行。CyclicBarrier:一组线程使用await()指定barrier,所有线程都到达各自的barrier后,再同时执行各自barrier下面的代码。Semaphore:是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
CountDownLatch是减计数方式,计数==0时释放所有等待的线程;CyclicBarrier是加计数方式,计数达到构造方法中参数指定的值时释放所有等待的线程。Semaphore,每次semaphore.acquire(),获取一个资源,每次semaphore.acquire(n),获取n个资源,当达到semaphore 指定资源数量时就不能再访问线程处于阻塞,必须等其它线程释放资源,semaphore.relase()每次资源一个资源,semaphore.relase(n)每次资源n个资源。
CountDownLatch当计数到0时,计数无法被重置;CyclicBarrier计数达到指定值时,计数置为0重新开始。
CountDownLatch每次调用countDown()方法计数减一,调用await()方法只进行阻塞,对计数没任何影响;CyclicBarrier只有一个await()方法,调用await()方法计数加1,若加1后的值不等于构造方法的值,则线程阻塞。
CountDownLatch、CyclikBarrier、Semaphore 都有一个int类型参数的构造方法。CountDownLatch、CyclikBarrier这个值作为计数用,达到该次数即释放等待的线程,而Semaphore 中所有acquire获取到的资源达到这个数,会使得其它线程阻塞。
共同:
CountDownLatch与CyclikBarrier两者的共同点是都具有await()方法,并且执行此方法会引起线程的阻塞,达到某种条件才能继续执行(这种条件也是两者的不同)。Semaphore,acquire方获取的资源达到最大数量时,线程再次acquire获取资源时,也会使线程处于阻塞状态。CountDownLatch与CyclikBarrier两者的共同点是都具有await()方法,并且执行此方法会引起线程的阻塞,达到某种条件才能继续执行(这种条件也是两者的不同)。Semaphore,acquire方获取的资源达到最大数量时,线程再次acquire获取资源时,也会使线程处于阻塞状态。CountDownLatch、CyclikBarrier、Semaphore 都有一个int类型参数的构造方法。
CountDownLatch、CyclikBarrier、Semaphore 都有一个int类型参数的构造方法。
CountdownLatch和CyclicBarrier的区别:
CountDownLatch简单的说就是一个线程等待,直到他所等待的其他线程都执行完成并且调用countDown()方法发出通知后,当前线程才可以继续执行。
CyclicBarrier是所有线程都进行等待,直到所有线程都准备好进入await()方法之后,所有线程同时开始执行!
CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
Semaphore:
Semaphore翻译成字面意思为 信号量,Semaphore可以控同时访问的线程个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。