Semaphore
被称为信号量,JUC 中非常重要也是很常见的一个类,内部通过 AQS 实现,主要控制线程间同步,应用场景例如限流。
Semaphore
底层基于 AQS 类似 ReentrantLock
实现了公平和公平的机制,AQS 中 state
被设计为许可证的剩余数量。注意 AQS 设计思想包括独占锁和共享锁,Semaphore
正是利用了 AQS 其锁共享机制。
创建 Semaphore
Semaphore
包含两个构造方法,分别用来创建公平和非公平的锁机制,默认为非公平机制,必须要指定的还有“许可证” permits
的数量。
// 公平
Semaphore fiarSp = new Semaphore(10, true);
// 非公平
Semaphore noFiarSp = new Semaphore(10);
state 控制
获取锁
下方选取非公平锁示例
final int nonfairTryAcquireShared(int acquires) {
// CAS 自旋
for (;;) {
// 当前可用
int available = getState();
// 占用后剩余数量
int remaining = available - acquires;
// CAS
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
如果剩余数量不足 1 - 0 == -1
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 获取共享锁
if (tryAcquireShared(arg) < 0)
// 进入队列
doAcquireSharedInterruptibly(arg);
}
进入队列,包装线程节点 Shared Thread Node,使用 LockSupport
挂起线程。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
释放和唤醒
调用 Semaphore.release
方法。
public void release() {
sync.releaseShared(1);
}
调用子方法释放锁
public final boolean releaseShared(int arg) {
// 调用子方法 tryReleaseShared 操作 state
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
CAS 归还许可证
protected final boolean tryReleaseShared(int releases) {
// 自旋直到 “归还” (CAS)成功。
for (;;) {
// 假设剩余 0
int current = getState();
// next = 0 + 1
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
移除当前 Node 并唤醒队列(链表)中状态符合的线程
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking. private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}