AQS是JDK并发包的基础,2018年研究了一番源码,略懂,近日再阅,又有提高。AQS的派生类有很多,本文介绍Semaphore,先从类注释说起。
- 第一段
A counting semaphore. Conceptually, a semaphore maintains a set of permits.
Each {@link #acquire} blocks if necessary until a permit is available, and then
takes it. Each {@link #release} adds a permit, potentially releasing a blocking
acquirer. However, no actual permit objects are used; the {@code Semaphore}
just keeps a count of the number available and acts accordingly.
注释说,Semaphore管理的是一组许可证,acquire方法将获取一个许可证,如果池中还有许可证,则获取成功并持有该许可证,否则阻塞。release方法向池中添加一个许可证,释放成功,acquire方法阻塞的线程就会唤醒并获得许可证。实际上许可证并不特指某些对象,而是由一个整形数字扮演许可证的角色。
- 第二段
Semaphores are often used to restrict the number of threads than can access
some (physical or logical) resource. For example, here is a class that uses a
semaphore to control access to a pool of items:
这段话讲明Semaphore的用处,常用来限制线程访问共享资源的数量,且注释中给出了一个源码例子:
class Pool {
// 池中允许的许可证数量
private static final int MAX_AVAILABLE = 100;
// 实例化Semaphore
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}
public void putItem(Object x) {
if (markAsUnused(x)) {
available.release();
}
}
// Not a particularly efficient data structure; just for demo
protected Object[] items = new Object[MAX_AVAILABLE];
protected boolean[] used = new boolean[MAX_AVAILABLE];
protected synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null;
}
protected synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
}
return false;
}
}
return false;
}
}
- 第三段
Before obtaining an item each thread must acquire a permit from
the semaphore, guaranteeing that an item is available for use. When
the thread has finished with the item it is returned back to the pool
and a permit is returned to the semaphore, allowing another thread
to acquire that item. Note that no synchronization lock is held when
{@link #acquire} is called as that would prevent an item from being
returned to the pool.The semaphore encapsulates the synchronization
needed to restrict access to the pool, separately from any synchronization
needed to maintain the consistency of the pool itself.
上文讲述例子的含义:
线程从Semaphore中获取许可证才能获得一个item,前提是Semaphore中
必须有许可证存在。线程使用完item后,先标记item未被使用,再向Semaphore
归还许可证,这样其他线程就能再次利用这个许可证。值得注意的是,线程执行
acquire方法并没有使用同步器,这么做的好处是避免获取许可证时无法归还许可
证。因为线程同步只允许一个线程获得锁。实际上Semaphore基于AQS,封装了
同步原语,目的是限制线程访问资源的数量,剥离了同步锁,实现池的一致性。
- 第四段
A semaphore initialized to one, and which is used such that it only has at most
one permit available, can serve as a mutual exclusion lock. This is more commonly
known as a <em>binary semaphore</em>, because it only has two states: one
permit available, or zero permits available. When used in this way, the binary
semaphore has the property (unlike many {@link java.util.concurrent.locks.Lock}
implementations), that the "lock" can be released by a thread other
than the owner (as semaphores have no notion of ownership). This can be useful
in some specialized contexts, such as deadlock recovery.
这段话告诉我们,Semaphore的特殊用法,当许可证数量赋值为1时,许可证只有两个状态,1代表可用,0代表不可用,这样的Semaphore叫二进制信号量,性质跟Lock接口的实现类很像,如ReentrantLock可重入锁。但是又有区别,互斥锁要求必须同一个线程获取锁,同一个线程释放锁,二进制信号量允许一个线程获取锁,另一个线程释放锁,这样特性常用在死锁恢复中。代码案例如下:
public class WaitNotifyAndLockSupport {
private static final Object obj = new Object();
public static void main(String[] args) {
Executor executor = Executors.newCachedThreadPool();
Pool pool = new Pool();
for (int i = 0; i < 10; i++) {
executor.execute(new Student(i+1, pool));
}
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
pool.releaseTrack();
}
static class Pool {
private static final int MAX_AVAILABLE = 1;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
protected Track[] items = {new Track(1)};
protected boolean[] used = new boolean[MAX_AVAILABLE];
public Track getTrack() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}
public void releaseTrack(Object x) {
if (markAsUnused(x)) {
available.release();
}
}
public void releaseTrack() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
used[i] = false;
}
available.release();
}
protected synchronized Track getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null;
}
protected synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
}
return false;
}
}
return false;
}
}
static class Track {
private int num;
public Track(int num) {
this.num = num;
}
@Override
public String toString() {
return "Track{" +
"num=" + num +
'}';
}
}
static class Student implements Runnable {
private int num;
private Pool pool;
public Student(int num, Pool pool) {
this.num = num;
this.pool = pool;
}
@Override
public void run() {
try {
//获取跑道
Track track = pool.getTrack();
if (track != null) {
System.out.println("学生" + num + "在" + track.toString() + "上跑步");
TimeUnit.SECONDS.sleep(2);
// System.out.println("学生" + num + "释放" + track.toString());
//释放跑道
// pool.releaseTrack(track);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
运行结果:
学生1在Track{num=1}上跑步
学生3在Track{num=1}上跑步
线程Student 如果不释放许可证,在main主线程中释放许可证,则等待的10个线程中会有一个线程获得许可证。
- 第五段
The constructor for this class optionally accepts a <em>fairness</em>
parameter. When set false, this class makes no guarantees about the
order in which threads acquire permits. In particular, <em>barging</em>
is permitted, that is, a thread invoking {@link #acquire} can be allocated
a permit ahead of a thread that has been waiting - logically the new thread
places itself at the head of the queue of waiting threads. When fairness is
set true, the semaphore guarantees that threads invoking any of the {@link
#acquire() acquire} methods are selected to obtain permits in the order in
which their invocation of those methods was processed (first-in-first-out; FIFO).
Note that FIFO ordering necessarily applies to specific internal points of
execution within these methods. So, it is possible for one thread to invoke
{@code acquire} before another, but reach the ordering point after the other,
and similarly upon return from the method. Also note that the untimed {@link
#tryAcquire() tryAcquire} methods do not honor the fairness setting, but will
take any permits that are available.
这段话很长,将的内容却很简单,说的是公平锁和非公平锁,公平锁遵循FIFO原则,先从AQS队列头部唤醒阻塞的线程,非公平锁并不遵循这一原则,如果刚好有线程释放许可证,那么来的找不如来的巧的线程将获得许可证。
- 第六段
Generally, semaphores used to control resource access should be
initialized as fair, to ensure that no thread is starved out from accessing
a resource. When using semaphores for other kinds of synchronization
control, the throughput advantages of non-fair ordering often outweigh
fairness considerations.
这段话说的是公平锁、非公平锁的选择,公平锁保证等久的线程线程不会空等,公平锁释保证吞吐量,保证许可证的高可利用率。
- 第七段
This class also provides convenience methods to {@link #acquire(int)
acquire} and {@link #release(int) release} multiple permits at a time.
Beware of the increased risk of indefinite postponement when these
methods are used without fairness set true.
最后一段话告诉我们,acquire和release支持同一时间使用,此时若采用非公平锁,那么线程可能无限期阻塞。