semaphore中文翻译为信号标,有些大哥习惯叫它信号量。其本质我的理解就是一个许可发放器。
正值疫情期间公园限流就是一个很好的场景来解释这件事。为了落实疫情防控的有关工作,公园管理员A(大哥大)做了100个小纸牌,给了管理员B,用于发放给游客进入公园使用,游客进公园需要持该小纸牌进入并游玩,并要求游客出门口时候要交回这个小纸,很快100个就发往完了,这时候来了第101位游客,结果发现没有纸牌可用,因此就给他搬了个马扎坐在了阴凉处歇会喝喝茶。有点像限流算法里面令牌桶的赶脚,这不是重点。
下面看下semaphore的具体实现吧
###首先看到的是 sync类这是并发包里面很多类都有的各自实现,
##李大爷通过模板模式将公用部分封装到AbstractQueuedSynchronizer
##其他的竞态抢夺策略则灵活的交给各个并发类进行实现
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
##初始化令牌数量,对应案例中的100个小纸牌被初始化出来
Sync(int permits) {
setState(permits);
}
##读取目前剩余量
final int getPermits() {
return getState();
}
#非公平的竞态获取方法
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
#死循环进行调用尝试通过cas原子操作刷新剩余小纸牌数量
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
##如果剩余数量小于零就直接返回负数传递给AQS父类进行排队,
##如果cas成功了则返回剩余数量
return remaining;
}
}
#游园结束归还小纸牌
protected final boolean tryReleaseShared(int releases) {
for (;;) {
#通过死循环的形式进行归还
int current = getState();
int next = current + releases;
if (next < current) // overflow
#超出最大数量这个一般不会发生比如超了int最大范围了
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
#减少目前令牌数量,管理员B尝试扣下来一些小纸牌,
#这个过要原子性完成的
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
#清空目前许可,类似于暴力作废所有小纸牌,把小纸牌清零了,不再往外发放
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
接下来是两个竞争方式的代码实现
/**
* NonFair version
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
#初始化许可数
NonfairSync(int permits) {
super(permits);
}
#调用不公平方法实现竞态资源获取
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
这里啰嗦两句公平跟不公平其实就是在竞态资源时候的一些区别,不公平我实现逻辑就是上来就抢,不会检查队列是否有排队的,抢不到就再去排队,而公平的实现逻辑则是上来先检查一下目前同步器队列中有没有排队的node节点,如果已经存在了则老老实实去排队
/**
* Fair version
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
#检查是否有排队的如果有则返回负值,告诉AQS也就是当前类的爷爷类,去安排当前线程去排队吧
if (hasQueuedPredecessors())
return -1;
#如果没有排队的线程尝试修改目前许可数量修改成功则继续执行当前线程
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
这里对公平锁与非公平锁的
tryAcquireShared((int acquires)进行一个简单说明,我们在阅读并发里面相关代码时候会经常看到这个名称的方法,这个方法设计的灵魂是李大爷在AQS类里面做了抽象,因为线程的排队跟调度不希望被子类去实现代码也不够优雅,因此李大爷在AQS里面定义了方法进行占位,当然类似的还有tryAcquire(int arg) 他们的职责其实都是要给AQS一个反馈,目的在于告诉AQS我是让他去尝试排队呢,还是直接放行呢,放行的话就需要子类在执行时候能够正常获得竞争状态,也就是往往能够按照预期通过CAS操作修改state变量的值(这里之所以是预期是因为不同的锁表现出不同的形式)。比如重入锁要求只有是0才能尝试加锁,如果是大于0的其实是被其他线程占有了那么就返回负值,如果是本文信号量则要求state得大于等于零且修改成功了返回大于等于零就能让当前线程获得执行权力。还有可重入读写锁,通过划分state的范围,每次通过掩码计算。
剩下的一些方法都是简单的调用了
#尝试获得许可可中断的,每次申请都会检查当前线程
是否被标记为Interrupted 如果被标记了则立马抛出异常进行反馈
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
线程被标记后不会立马抛出异常,而是将标记记录下来当线程被唤醒时候,将interrupt状态填充回去并返回。
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
这里面适当展开一点,可能有不少小伙伴在读到李大爷的AQS里面一行代码会懵逼
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)①
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
懵逼产生的根源再有第一个方法比较好理解就是检查当前是不是竞争失败要停掉当前线程,这里面有个线程叫停的api LockSupport.park(),这个大哥其实是能够将线程给喊停的。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
#线程跑到这就等于会休息了
#this是一个blocker,为了方便在jvm检测时候看清楚锁所在位置用的。
return Thread.interrupted();
}
我之前懵逼在这里,就是为何调用这个api,
这个api的意思其实是要解决这个问题,线程在挂起的时候有人调用了该//线程的interrupt方法,也就是给线程打了个标记我,调用interrupted方法其实是会重置标记为false,但是返回值为true。如果标记为false,在调用收回返回false,如果返回true李大爷在外面声明的interrupted变量就会被修改为true,当doAcquireShared方法执行循环时候看上面doAcquireShared代码的①那么就会为true。
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
调用了selfInterrupt(),李大爷这么设计其实是为了将线程park时候被修改的状态进行传递,可以理解要死死外面去,我就管排队跟唤醒,其他的操作我给你原模原样传递出去。
再具体的AQS里面的实现细节后面慢慢撕,书归正传
分析后面几个方法的作用
尝试获得许可一次性的,如果获得了许可返回true否则返回false
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
一次尝试申请permits个许可,申请成功则返回true否则返回false
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
指定时间内尝试获取许可传入时间数量,跟时间计量单位
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
释放指定数量的许可
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
查询可用的许可数量
public int availablePermits() {
return sync.getPermits();
}
清空目前所有许可数
public int drainPermits() {
return sync.drainPermits();
}
减少指定数量的许可数
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}
判断当前是否为公平模式
public boolean isFair() {
return sync instanceof FairSync;
}
返回当前是否有排队的线程,通过aqs head!=tail实现,不一定准确
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
返回目前排队队列长度,每次获取是通过aqs迭代累加node链表,因此也不一定准确,
public final int getQueueLength() {
return sync.getQueueLength();
}
返回目前排队线程集合
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}