前言
Semaphore
(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
Semaphore
可以用于做流量控制,特别是公用资源有限的应用场景,比如一个拥有固定车位的停车场,当停车场里面所有位置都已经被占满后,后面的车子只能等待车位,如果此时有1
个车子离开此停车场,那么等待的汽车中就可以有1
辆汽车进入该停车场,至于进入的规则,就需要看是Nonfair
还是Fair
机制了.
本文源代码: 代码下载
例子1:了解Semaphore
此例子初始一个大小为
3
的停车场,然后启动6
辆车去停车场停车,每辆车在停车场的时间由随机数产生.
package com.sourcecode.concurrencytools_Semaphore;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class SemaphoreTest {
static Random random = new Random();
static class ParkingLot {
Semaphore semaphore;
ParkingLot(int size) {
semaphore = new Semaphore(size);
}
public void park() {
try {
semaphore.acquire();
int waitTime = random.nextInt(10);
System.out.println(Thread.currentThread().getName() + " parks, it takes " + waitTime + " seconds.");
TimeUnit.SECONDS.sleep(waitTime);
System.out.println(Thread.currentThread().getName() + " leaves.");
semaphore.release();
} catch (InterruptedException ie) {
ie.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
static class Car extends Thread {
ParkingLot parkingLot;
public Car(ParkingLot parkingLot) {
this.parkingLot = parkingLot;
}
public void run() {
this.parkingLot.park();
}
}
public static void main(String[] args){
ParkingLot parking = new ParkingLot(3);
for(int i = 0 ; i < 6 ; i++){
new Car(parking).start();
}
}
}
结果如下: 首先
thread-1
和thread-2
和thread-0
进入到停车场,然后后面的车没办法再进入到停车场,因为停车场已经满了,当thread-0
离开停车场后,可以有一辆车子进入到停车场,这个车子是thread-3
. 后面的以此类推即可.
Thread-1 parks, it takes 1 seconds.
Thread-2 parks, it takes 3 seconds.
Thread-0 parks, it takes 0 seconds.
Thread-0 leaves.
Thread-3 parks, it takes 6 seconds.
Thread-1 leaves.
Thread-4 parks, it takes 6 seconds.
Thread-2 leaves.
Thread-5 parks, it takes 0 seconds.
Thread-5 leaves.
Thread-3 leaves.
Thread-4 leaves.
实现思路
该类与
ReentrantLock
实现类似,用到了公平和非公平机制,并且都是用到的共享锁.
从图中可以知道Semaphore
有个内部类Sync
,该类继承于AbstractQueuedSynchronizer
(AQS
是实现的基础),并且该Sync
有两种实现类,FairSync
和NonfairSync
,分别代表公平机制和非公平机制.
公平机制表示的是在等待锁的过程中如果可以获得锁了,先等待的线程必须要先获得锁.
从ReentrantLock
的实现也可以知道Semaphore
的方法也是依赖于Sync
的实例sync
. (所以关键地方还是AQS
,是必须要掌握的基础).
semaphore_framework.png
Sync类及其子类
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
// 初始化当前AQS状态
Sync(int permits) {
setState(permits);
}
// 返回当前AQS状态
final int getPermits() {
return getState();
}
// 返回不公平机制下获得共享锁
// 返回值小于0 表示没有获得锁
// 返回值大于等于0 表明获得锁
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// 释放共享锁 释放的个数releases
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
// 减少许可数量reductions
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
// 将许可数量减为0
// 返回值是减少的数量
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
// 非公平锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
// 公平锁
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
// 返回值小于0 表示没有获得锁
// 返回值大于等于0 表示获得锁
protected int tryAcquireShared(int acquires) {
for (;;) {
// 如果有前驱节点 保持公平机制 返回一个负数
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
其实这些跟重入锁
ReentrantLock
差不多,没什么太多好分析的.
构造函数
private final Sync sync;
// 默认为非公平锁
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// 根据fair来判断构造公平锁还是非公平锁
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
acquire()方法和release()方法
由于
Semaphore
中的方法都是通过sync
调用的并且原理类似,因此拿出acquire()
和release()
方法进行分析.
acquire()方法
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
作用: 可以响应中断式的获取共享锁.
该方法会调用
AQS
中的acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
接着会根据子类
Sync
重写的tryAcquireShared(arg)
方法来判断是否获得锁,所以如果是非公平锁则调用nonfairTryAcquireShared(int acquires)
,如果是公平锁就调用tryAcquireShared(int acquires)
方法.
// 返回值小于0 表示没有获得锁
// 返回值大于等于0 表示获得锁
protected int tryAcquireShared(int acquires) {
for (;;) {
// 如果有前驱节点 保持公平机制 返回一个负数
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// 返回不公平机制下获得共享锁
// 返回值小于0 表示没有获得锁
// 返回值大于等于0 表明获得锁
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
可以看到公平锁与非公平锁的区别在于增加了一个
hasQueuedPredecessors()
判断同步等待队列中是否有等待的线程.
release()方法
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
作用: 释放
permits
个许可
与上面类似,它会先调用AQS
中的releaseShared(int arg)
,进而调用Sync
中的tryReleaseShared(int arg)
方法判断是否释放成功.
// AQS
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
//Sync
// 释放共享锁 释放的个数releases
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
可以看到
acquire()
和release()
在获取和释放信号量的时候没有像重入锁一样去判断是否是当前线程,而是直接加上或者减去状态数,可以知道该状态值是无状态型的,跟线程无关.因此也可以在同一个线程中在获得信号量后继续获取信号量.
例子2: 测试一个线程中不断获取信号量
开启一个大小为
3
的信号量,并且使用同一个线程获得3
次.
package com.sourcecode.concurrencytools_Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreTest3 {
static Semaphore semaphore = new Semaphore(3);
public static void main(String[] args) throws InterruptedException {
MyThread thread1 = new MyThread();
thread1.start();
TimeUnit.SECONDS.sleep(2);
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " get locks.");
semaphore.release();
System.out.println(Thread.currentThread().getName() + " finishes");
}
static class MyThread extends Thread {
public void run() {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " get Locks 1");
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " get Locks 2");
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " get Locks 3");
TimeUnit.SECONDS.sleep(10);
semaphore.release();
System.out.println(Thread.currentThread().getName() + " release locks 1");
semaphore.release();
System.out.println(Thread.currentThread().getName() + " release locks 2");
semaphore.release();
System.out.println(Thread.currentThread().getName() + " release locks 3");
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
}
结果如下: 可以看到线程
0
获得了3
次并且当此线程释放一个信号量后,主线程就可以去获取信号量了.
Thread-0 get Locks 1
Thread-0 get Locks 2
Thread-0 get Locks 3
Thread-0 release locks 1
main get locks.
main finishes
Thread-0 release locks 2
Thread-0 release locks 3
例子3: 关于try...finally...的写法讨论
在网上有可以对于
acquire()
和release
使用try...finally...
的写法,有点疑惑便根据Semaphore
源代码分析和测试了一下.先看下面的例子.
初始化了一个大小为
1
的信号量,启动一个线程myThread
去获取信号量,获取信号量后让其休眠10s
后,再释放信号量,所以让另外一个线程myThread2
在等待获取信号量的时候发生中断,此时根据我们上面分析源码应该可以知道acquire()
是响应中断的,因此此时会抛出中断异常(没有成功获得锁),进而此时根据该写法会即使没有获得信号量也要进入到finally
代码块中去释放信号量,进而会使信号量的许可证加1
.(因为即使没有获得信号量的情况下去调用release()
方法不会报错,会使许可证加1
).
package com.sourcecode.concurrencytools_Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreTest2 {
static Semaphore semaphore = new Semaphore(1);
public static void main(String[] args) throws InterruptedException {
MyThread myThread = new MyThread();
myThread.start();
MyThread myThread2 = new MyThread();
myThread2.start();
TimeUnit.SECONDS.sleep(2);
myThread2.interrupt();
}
static class MyThread extends Thread {
public void run() {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " get Semaphore");
} catch (InterruptedException ie) {
System.out.println(Thread.currentThread().getName() + " semaphore.acquire, ie:" + ie);
} finally {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException ie) {
System.out.println(Thread.currentThread().getName() + "wait 10s, ie:" + ie);
}
System.out.println(Thread.currentThread().getName() + " release Semaphore.");
semaphore.release();
System.out.println(Thread.currentThread().getName() + " available:" + semaphore.availablePermits());
}
}
}
}
结果如下: 可以看到
Thread-1
在没有获得信号量的情况下依然调用了release()
方法进而使得信号量许可数量为2
,明显的错误了,因为总的信号量许可证数量是1
.
Thread-0 get Semaphore
Thread-1 semaphore.acquire, ie:java.lang.InterruptedException
Thread-0 release Semaphore.
Thread-0 available:1
Thread-1 release Semaphore.
Thread-1 available:2
正确写法参考上面的例子
1
或例子2
即可.
参考
1. Java并发编程的艺术