ps:源码雷同CountDownLatch,细节不讲解,请查看上一篇CountDownLatch源码分析。
semaphore 也就是我们常说的信号灯,semaphore 可以控制同时访问的线程个数,通过 acquire 获取一个许可,如 果没有就等待,通过 release 释放一个许可。有点类似限流 的作用。叫信号灯的原因也和他的用处有关,比如某商场 就 5 个停车位,每个停车位只能停一辆车,如果这个时候 来了 10 辆车,必须要等前面有空的车位才能进入。
Semaphore 比较常见的就是用来做限流操作了。
案例
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore=new Semaphore(5); //令牌数 state=5
for(int i=0;i<10;i++){
new Car(semaphore,i).start();
}
}
static class Car extends Thread{
Semaphore semaphore;
int num;
public Car(Semaphore semaphore, int num) {
this.semaphore = semaphore;
this.num = num;
}
@Override
public void run() {
try {
semaphore.acquire(); //5-1 获得令牌.(没拿到令牌,会阻塞,拿到了就可以往下执行)
System.out.println("第"+num+"线程占用一个令牌");
Thread.sleep(3000);
System.out.println("第"+num+"线程释放一个令牌");
semaphore.release(); //释放令牌
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
源码分析
从 Semaphore 的功能来看,我们基本能猜测到它的底层 实现一定是基于 AQS 的共享锁,因为需要实现多个线程共 享一个令牌池
创建 Semaphore 实例的时候,需要一个参数 permits, 这个基本上可以确定是设置给 AQS 的 state 的,然后每 个线程调用 acquire 的时候,执行 state = state - 1,
走喽");
release 的时候执行 state = state + 1,当然,acquire 的 时候,如果 state=0,说明没有资源了,需要等待其他线 程 release。
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared分为公平锁FairSync和非公平锁NonfairSync,区别在于公平锁先判断队列中是否有等待的线程
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
有的话就将当前线程放入AQS队列。
如果state-1后判断小于0,那么就执行doAcquireSharedInterruptibly阻塞。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
release
tryReleaseShared采用cas操作将state+1,然后去释放共享锁doReleaseShared
public void release() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
代码与CountDownLatch相同,到此阅读完毕。