AQS(AbstractQueuedSynchronizer)是JUC中提供的用来构建锁和同步器的框架,基于它,我们就可以很容易的构建高效的同步器,例如:ReentrantLock、Semaphore等等。
AQS是锁实现的模板,解决锁实现中大量的细节问题,例如线程等待,资源的同步等。
接下来我将从常见的分布式锁,引申出AQS的实现逻辑。
基于redis的分布式锁
估计能看到这篇文章的同学,都或多或少的知道基于redis的分布式锁的实现,下面就是一种简单的实现:
private static void lock(String flag, long timeoutMilli) {
while (true) {
String result = jedis.set(flag, Thread.currentThread().getName(), "nx", "px", timeoutMilli);
if ("OK".equals(result)) {
return;
}
}
}
简单的几行代码,就实现了分布式的自旋锁,思路:利用redis的原子命令(不存在就设值)进行资源的抢占,并发抢占失败的循环尝试。
非常的简单,我们所知的Juc的锁实现怎么如此复杂呢?接下来,我们就理一理这团乱麻。
AQS(AbstractQueuedSynchronizer)
我们通常都不会直接使用AQS,标准的同步器集合能够满足我们绝大多数需求。但知其然,且知其所以然,这是一种态度。
我们间接使用AQS,用的最多的估计就是锁了,接下来我便以ReentrantLock为例,来揭开AQS的庐山真面目。
ReentrantLock
ReentrantLock是比synchronized更加灵活的同步器,提供了更加丰富的功能,包括:tryLock()、lockInterruptibly()以及公平非公平锁的实现等等。下面选择非公平锁的加锁和释放锁来作为突破口,进行分析。
加锁
为了讲清楚事情,免不了落入俗套,摘抄点源码。
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
看到了吗?compareAndSetState就是上面redis中的setNx命令,原子复合操作,如果设置成功,相当于成功获取锁,从而也就得到了代码段的准入权限。
没有设置成功,进入AQS的acquire()方法,继续看代码:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire()有什么用?尝试占有资源,也就是相对于调用compareAndSetState,至于为什么要这样设计?可能是作者想让锁的获取更加及时,而进行的多次尝试吧。
addWaiter()方法,加入一个节点到链表中,这个链表可以认为是锁等待队列,这个节点持有当前线程的引用,表示当前线程。
acquireQueued(),可以对比redis实现的分布式锁,估计应该就是循环中尝试获取锁的实现。下面看看代码:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//无限循环
for (;;) {
final Node p = node.predecessor();
//链表的头节点是空节点
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//改变节点状态,然后睡眠当前线程,减少空转
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
就是自旋锁的实现,只不过进行了优化,采用的是类似线程的wait和notify用法。当前线程wait,谁来notify呢?答案是持有锁的线程。
释放锁
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
不难发现,其实就只有2步,恢复占用的资源tryRelease(),唤醒wait等待的线程。
如何唤醒,这就是等待队列的作用了,每个节点都有对应线程的引用,就是靠这个引用来唤醒对应的线程。被唤醒的线程将继续循环,争抢锁。
顺便提一下线程等待唤醒用的是LockSupport线程睡眠.