一. 构造方法:
-
Semaphore(int permits)
:非公平 -
Semaphore(int permits, boolean fair)
:自定义是否公平
当fair = false
时,此类不对线程获取许可的顺序作任何保证。反之,则按照FIFO(先进先出)的顺序来选择线程并获得许可。
当permits = 1
时,可当作一个互斥锁,通常叫做二进制信号量。因为只有两种状态:1个可用许可或0个可用许可。
二. 源码解析:
acquire()
方法:
if (tryAcquireShared(arg) < 0)//尝试获取次数
doAcquireSharedInterruptibly(arg);//没有次数可获取,入队列
这里只看公平类FairSync
复写的tryAcquireShared
方法:
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())// 公平获取锁的关键方法,返回true,就入队列
return -1;
int available = getState();// 获取可用次数
int remaining = available - acquires;// 剩余可用次数
// 直到可用次数小于0(入队列)or 更新可用次数成功(获取到次数无需入队列)
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}
/**
* return true:if there is a queued thread preceding the
* current thread(如果有其他线程在当前线程前面排队), return false: if the current thread
* is at the head of the queue or the queue is empty(如果当前线程处于队列头部 or 队列为空)
*/
public final boolean hasQueuedPredecessors() {
// 该方法的正确性依赖于头部先于尾部初始化,
// 并且如果当前线程是队列中的第一个,head.next也是正确的,即第一个入队的线程入队流程结束,详见enq方法。
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
hasQueuedPredecessors()
方法返回结果的具体情况:
return false:
h == t:队列为空,无需排队
h != t && (s = h.next) != null && s.thread == Thread.currentThread():当前线程在队列头部,无需排队return true: 当前线程前面还有其他线程在排队
h != t && (s = h.next) == null:详见enq方法,头部节点初始化尚未完成,需排队
h != t && (s = h.next) != null && s.thread != Thread.currentThread():队列中至少有一个线程,当前线程需排队
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 入队列,且放在队列的尾部,当一开始队列为空时,会创建一个新的Head节点,第一个入队的节点挂在head后面
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 获取node的前一个节点
final Node p = node.predecessor();
if (p == head) {
// 只有head节点的下一个节点才能获取到次数,符合队列特性FIFO,其他节点一直无限循环,直到自身成为head的下一个节点
int r = tryAcquireShared(arg);
if (r >= 0) {
// 把 node 设置为新的 head
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
release()
方法,主要看tryReleaseShared
方法。
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();// 当前可用次数
int next = current + releases;// 当前可用次数 + 释放次数
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))//CAS操作更新可用次数成功
return true;
}
}
三. 用途:
Semaphore通常用于限制访问某些资源的线程数量。
四. 示例:
使用Semaphore定义有界阻塞容器:
一般来说,我们所用的Java容器都是无界的,也就是可以无限制的往容器里添加元素。那么如何定义一个限制元素个数的容器呢?并且超过容器上限时,阻塞添加操作,删除元素后恢复添加操作。
Semaphore可以很好的解决该问题,可以当做一个有次数的锁,acquire后需要release才释放次数,只是acquire的话,锁次数会一直减少。
因为涉及到并发,容器也需要同步Collections.synchronized...
。
以set为例,代码如下:
public class BoundedHashSet<E> {
private final Set<E> set;
private final Semaphore sem;
public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<>());
this.sem = new Semaphore(bound);
}
public boolean add(E e) throws InterruptedException {
sem.acquire();
boolean added = false;
try {
added = set.add(e);
return added;
} finally {
if (!added) {
sem.release();//添加失败,释放次数
}
}
}
public boolean remove(E e) {
boolean removed = set.remove(e);
if (removed) {
sem.release();
}
return removed;
}
}
参考资料:《Java并发编程实战》、JDK文档