简介
Semaphore(信号量)是juc包下的一个工具类,主要是用来控制同时访问公共资源的线程数,这个数量在Semaphore中叫做凭证数(acquires),内部是采用AbstractQueuedSynchronizer去实现的,AbstractQueuedSynchronizer是一个同步队列,juc包下有许多工具类都依赖于该类去实现功能,例如ReentrantReadWriteLock、CountDownLatch、Semaphore等,那么我们接下来就具体看下Semaphore的具体使用方法。
案例
首先我们设定一个背景,假如现在我们去银行取钱,银行的窗口有四个,然后现在有很多人都在等着办理业务,但是窗口只有四个,所以同时能进行业务办理的只有四个,每办理完一个释放一个可用窗口然后才能进行下一个的业务办理,这个场景就可以用Semaphore来实现。
public class SemaphoreTest {
private static final int MAX_COUNT = 4;
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Semaphore semaphore = new Semaphore(MAX_COUNT, true);
int personNumber = 20;
for (int i = 0; i < personNumber; i++) {
executorService.execute(() -> {
try {
//尝试获取资源
semaphore.acquire(1);
//....办理业务...//
LockSupport.parkNanos(1000 * 1000 * 1000);
//释放资源
semaphore.release(1);
System.out.println("办理完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
源码分析(acquire方法)
接下来我们具体分享下Semaphore的原理,Semaphore主要就是两个方法一个是获取凭证一个是释放凭证资源,我们先来分析下acquire方法。
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)//尝试获取许可
doAcquireSharedInterruptibly(arg);//获取许可失败进入该方法
}
这里我们可以看出acquire方法内部调用的是sync类的acquireSharedInterruptibly方法,而acquireSharedInterruptibly方法中存在两个方法的调用,首先调用的是tryAcquireShared方法,该方法功能是尝试获取凭证,如果方法返回值小于0则进行acquireSharedInterruptibly方法。
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();//获取当前可用的凭证数,state是由validate修饰的,保证变量的可见性。
int remaining = available - acquires;//计算出剩余凭证数量
if (remaining < 0 ||
compareAndSetState(available, remaining))//凭证数小于零或者是cas替换凭证数成功则直接结束该方法。
return remaining;
}
}
这里我看到tryAcquireShared方法,内部是采用自旋重试方式进行锁获取的,单次循环的逻辑可以看上述代码,先是调用hasQueuedPredecessors方法(这个方法的功能是查看当前是否存在排队的线程,如果存在排队的线程且不是当前线程则返回true,当前循环直接终止并返回-1),这个方法只存在公平模式中,非公平模式没有这段判断逻辑。
然后再获取state值也就是可用凭证数,判断其state是否可用,可用进行cas原子替换操作,将计算后的可用凭证数值赋值给state,执行完毕返回remaining值。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);//创建SHARED类型node节点
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();//获取当前节点的上一个节点
if (p == head) {//如果上一个节点为head,进入if业务逻辑
int r = tryAcquireShared(arg);//尝试获取凭证
if (r >= 0) {//获取成功,并且剩余凭证数大于零进入if业务逻辑
setHeadAndPropagate(node, r);//通知并且重置head节点
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
当tryAcquireShared方法获取凭证数失败后,则会进入上述方法doAcquireSharedInterruptibly中,首先为当前线程创建一个node等待节点,并且添加至aqs队列中,如果当前node节点的上一个节点非head,且上一个节点也是等待状态,则当前线程执行LockSupport.park进行长时间等待直到有可用凭证数为止,如果上一个节点是head节点则进行再一次的尝试获取凭证数,如果获取成功则直接终止循环并且将node节点状态置为已取消,如果获取失败则进行再次循环执行相同的业务逻辑。
这里核心注意下两个方法setHeadAndPropagate、shouldParkAfterFailedAcquire。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
setHeadAndPropagate这个方法只会存在Share模式下,如果是独占则不会存在该方法,该方法的作用就是当可用凭证数大于0的时候进行往后唤醒等待的共享节点。
shouldParkAfterFailedAcquire该方法是判断当前节点是否应该被挂起,如果前驱节点为signal则直接挂起当前线程,如果前驱节点为cancelled则一直往上查找直至找到状态不为cancelled为止并且将前置节点的状态cas替换为signal。
waitStatus 5 个状态值的含义:
cancelled(1):该节点的线程可能由于超时或被中断而处于被取消状态,一旦处于这个状态,节点状态将一直处于cancelled,因此应该从队列中移除。
signal(-1):当前节点为 signal 时,后继节点会被挂起,因此在当前节点释放锁或被取消之后必须被唤醒(unparking)其后继结点。
condition(-2):该节点的线程处于等待条件状态,不会被当作是同步队列上的节点,直到被唤醒(signal),设置其值为 0,重新进入阻塞状态。
propagate(-3) :下一个 acquireShared 无条件传播。
0:初始化状态。
源码分析(release方法)
acquire分析完之后我们再看下release方法。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
releaseShared方法是属于sync的内部方法,由semaphore.release进行调用,方法内部先调用tryReleaseShared方法,如果成功则调用doReleaseShared方法,我们先看下releaseShared方法内部的tryReleaseShared方法。
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
tryReleaseShared方法功能很简单,该方法就是将state加上释放的releases值然后再cas赋值给state
变量。
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
doReleaseShared该方法主要功能就是尝试唤醒head的后继节点,如果后继节点存在且是signal状态则进行唤醒,这里需要关注的是unparkSuccessor方法,unparkSuccessor方法内部对head节点的后继节点进行了二次判断,如果后继节点不存在或是waitStatus为取消状态则从队列尾部开始向前查找知道节点状态不是取消为止,然后进行唤醒。
总结
至此我们已经将Semaphore的acquire方法以及release方法的源码简单的进行了分析,Semaphore可以用来控制访问公共资源的线程数,其内部是基于aqs实现的,aqs内部维护的state变量表示当前还可获取许可证的资线程数,感兴趣的同学可以去深入看下其他的方法,例如带超时时间的tryAcquire方法。