Semaphore是啥?
贴个官方解释:
Semaphore用于限制可以访问某些资源(物理或逻辑的)的线程数目,其维护了一个许可证集合,有多少资源限制就维护多少许可证集合,假如这里有N个资源,那就对应于N个许可证,同一时刻也只能有N个线程访问。一个线程获取许可证就调用acquire方法,用完了释放资源就调用release方法。
举例
举个生活中的例子:
某餐厅只有3张餐桌,而当前需要接待的客人有6桌,那么就需要领号排队,结束一桌进去一桌,此处餐桌就相当于Semaphore的许可证。
通过代码实现上述过程:
public class SemaphoreTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
Semaphore semaphore = new Semaphore(3);
for(int i=0; i<6; i++){
Runnable runnable = new Runnable() {
@Override
public void run() {
try{
// 获取令牌
System.out.println(System.currentTimeMillis() + ": " + Thread.currentThread().getName() + "尝试获取令牌");
semaphore.acquire();
System.out.println(System.currentTimeMillis() + ": " + Thread.currentThread().getName() + "获取到令牌,开始就餐");
Thread.sleep((long) (Math.random() * 10000));
System.out.println(System.currentTimeMillis() + ": " + Thread.currentThread().getName() + "就餐完毕,随后释放令牌");
semaphore.release();
}catch (InterruptedException ie){
ie.printStackTrace();
}
}
};
service.execute(runnable);
}
service.shutdown();
}
}
点击运行:
1595402085297: pool-1-thread-1尝试获取令牌
1595402085297: pool-1-thread-2尝试获取令牌
1595402085297: pool-1-thread-1获取到令牌,开始就餐
1595402085297: pool-1-thread-2获取到令牌,开始就餐
1595402085298: pool-1-thread-3尝试获取令牌
1595402085298: pool-1-thread-3获取到令牌,开始就餐
1595402085298: pool-1-thread-4尝试获取令牌
1595402085298: pool-1-thread-5尝试获取令牌
1595402085299: pool-1-thread-6尝试获取令牌
1595402088192: pool-1-thread-3就餐完毕,随后释放令牌
1595402088192: pool-1-thread-4获取到令牌,开始就餐
1595402091469: pool-1-thread-4就餐完毕,随后释放令牌
1595402091469: pool-1-thread-5获取到令牌,开始就餐
1595402091504: pool-1-thread-5就餐完毕,随后释放令牌
1595402091504: pool-1-thread-6获取到令牌,开始就餐
1595402092173: pool-1-thread-1就餐完毕,随后释放令牌
1595402094097: pool-1-thread-2就餐完毕,随后释放令牌
1595402094502: pool-1-thread-6就餐完毕,随后释放令牌
源码剖析
Semaphore是基于AQS共享锁来实现的,先看一下其构造函数:
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
可以看到,真正起作用的是NonfairSync和FairSync内部类,分别对应非公平锁和公平锁。
NonfairSync和FairSync均是Sync的继承类。
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
// 构造函数传入令牌数
Sync(int permits) {
setState(permits);
}
// 获取令牌数
final int getPermits() {
return getState();
}
// 定义非公平版的TryAcquireShared方法
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 获取当前资源值
int available = getState();
// CAS尝试释放资源acquires
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// CAS释放共享锁
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
// 减少令牌数
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
// 将令牌数归零
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
先看非公平版的Sync:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
// 覆写AQS的tryAcquireShared方法,本质上调用的是Sync的nonfairTryAcquireShared
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
接着看公平版的Sync:
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
可以发现,FairSync的tryAcquireShared方法与Sync的nonfairTryAcquireShared方法相比,只是多了以下代码:
if (hasQueuedPredecessors())
return -1;
当线程尝试获取共享锁时,首先判断AQS同步队列中是否有等待的线程,若有等待的线程,则直接返回-1。
接着看一下Semaphore如何获取令牌:
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
最终调用的Sync的acquireSharedInterruptibly的方法,acquireSharedInterruptibly是AQS中的方法:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
非公平锁模式下,tryAcquireShared方法会直接CAS尝试更新state获取共享锁,不管AQS同步队列中是否有阻塞线程,仅有当前资源为0时,tryAcquireShared(1)才会返回-1,进入到doAcquireSharedInterruptibly方法中。
而公平锁模式下,tryAcquireShared方法在CAS尝试更新state获取共享锁前,先检查AQS同步队列中是否有阻塞线程,若有,则直接返回-1,继续执行doAcquireSharedInterruptibly方法将当前线程添加到AQS同步队列中。
释放令牌的话:
public void release() {
sync.releaseShared(1);
}
最终调用AQS的releaseShared方法。
所以,好好研究AQS,只要AQS理解透彻了,Java的这些并发工具类也就通透了。