java 计数信号量用来控制访问某个特定资源的操作数量,或者同时执行某个指定操作的数量,还可以用来实现某种资源池,或者对容器施加边界。
Semaphore中管理着一组虚拟的permits, 初始数量可以通过构造函数来指定, 在执行操作的时候首先要获得许可, 并在使用之后释放许可。如果没有许可, 那么acquire将阻塞直到有许可, 或者被中断或者操作超时。release方法会释放一个许可给Semaphore。
Semaphore有两种构造函数,一个是Semaphore(int permits), 通过这个构造函数生成的Semaphore不能保证获取permits的先后顺序, 意味着可能一个新的thread 会比一个等待了一段时间的另外一个thread先获得许可。第二个构造函数是Semaphore(int permits, boolean fair), 你可以通过制定fair是true来让threads 有序的获取permits。
Semaphore 的同步的实现是通过AQS来保证的, 如果有对AQS还不是很了解的同学可自行google 学习。 Semaphore 是共享的同步机制, 因此实现了两个AQS的两个方法,tryAcquireShared和tryReleaseShared。
'''
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
'''
'''
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
'''
Semaphore 还可以用来将任何一种容器变成有界的阻塞容器, 如下面的BoundedHashSet
'''
public class BoundedHashSet<T> {
private final Set<T> set;
private final Semaphore semaphore;
public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<>());
this.semaphore = new Semaphore(bound);
}
public boolean add(T t) throws InterruptedException{
semaphore.acquire();
boolean wasAdded = false;
try{
wasAdded = set.add(t);
return wasAdded;
}
finally {
if (!wasAdded){
semaphore.release();
}
}
}
public boolean remove(T t){
boolean wasRemoved = set.remove(t);
if(wasRemoved){
semaphore.release();
}
return wasRemoved;
}
}
'''