深入浅出AbstractQueuedSynchronizer
在Java多线程编程中,重入锁(ReentrantLock) 和信号量(Semaphore)是两个极其重要的并发控制工具。相信大部分读者都应该比较熟悉它们的使用(如果不清楚的小伙伴,赶快拿出书本翻阅一下)。
但是不知道大家是不是有了解过重入锁和信号量的实现细节? 我就带大家看一看它们的具体实现。
首先,先上一张重要的类图,来说明一下三者之间的关系:
可以看到, 重入锁和信号量都在自己内部,实现了一个AbstractQueuedSynchronizer的子类,子类的名字都是Sync。而这个Sync类,也正是重入锁和信号量的核心实现。子类Sync中的代码也比较少,其核心算法都由AbstractQueuedSynchronizer提供。因此,可以说,只要大家了解了AbstractQueuedSynchronizer,就清楚得知道重入锁和信号量的实现原理了。
了解AbstractQueuedSynchronizer你必须知道的
在正是进入AbstractQueuedSynchronizer之前,还有一些基础知识需要大家了解,这样才能更好的理解AbstractQueuedSynchronizer的实现。
基于许可的多线程控制
为了控制多个线程访问共享资源 ,我们需要为每个访问共享区间的线程派发一个许可。拿到一个许可的线程才能进入共享区间活动。当线程完成工作后,离开共享区间时,必须要归还许可,以确保后续的线程可以正常取得许可。如果许可用完了,那么线程进入共享区间时,就必须等待,这就是控制多线程并行的基本思想。
打个比方,一大群孩子去游乐场玩摩天轮,摩天轮上只能坐20个孩子。但是却来了100个小孩。那么许可以的个数就是20。也就说一次只有20个小孩可以上摩天轮玩,其他的孩子必须排队等待。只有等摩天轮上的孩子离开控制一个位置时,才能有其他小孩上去玩。
因此,使用许可控制线程行为和排队玩摩天轮差不多就是一个意思了。
排他锁和共享锁
第二个重要的概念就是排他锁(exclusive)和共享锁(shared)。顾名思义,在排他模式上,只有一个线程可以访问共享变量,而共享模式则允许多个线程同时访问。简单地说,重入锁是排他的;信号量是共享的。
用摩天轮的话来说,排他锁就是虽然我这里有20个位置,但是小朋友也只能一个一个上哦,多出来的位置怎么办呢,可以空着,也可以让摩天轮上唯一的小孩换着做,他想坐哪儿就坐哪儿,1分钟换个位置,都没有关系。而共享锁,就是玩耍摩天轮正常的打开方式了。
LockSupport
LockSupport可以理解为一个工具类。它的作用很简单,就是挂起和继续执行线程。它的常用的API如下:
- public static void park() : 如果没有可用许可,则挂起当前线程
- public static void unpark(Thread thread):给thread一个可用的许可,让它得以继续执行
因为单词park的意思就是停车,因此这里park()函数就表示让线程暂停。反之,unpark()则表示让线程继续执行。
需要注意的是,LockSupport本身也是基于许可的实现,如何理解这句话呢,请看下面的代码:
LockSupport.unpark(Thread.currentThread());
LockSupport.park();
大家可以猜一下,park()之后,当前线程是停止,还是 可以继续执行呢?
答案是:可以继续执行。那是因为在park()之前,先执行了unpark(),进而释放了一个许可,也就是说当前线程有一个可用的许可。而park()在有可用许可的情况下,是不会阻塞线程的。
综上所述,park()和unpark()的执行效果和它调用的先后顺序没有关系。这一点相当重要,因为在一个多线程的环境中,我们往往很难保证函数调用的先后顺序(都在不同的线程中并发执行),因此,这种基于许可的做法能够最大限度保证程序不出错。
与park()和unpark()相比, 一个典型的反面教材就是Thread.resume()和Thread.suspend()。
看下面的代码:
Thread.currentThread().resume();
Thread.currentThread().suspend();
首先让线程继续执行,接着在挂起线程。这个写法和上面的park()的示例非常接近,但是运行结果却是截然不同的。在这里,当前线程就是卡死。
因此,使用park()和unpark()才是我们的首选。而在AbstractQueuedSynchronizer中,也正是使用了LockSupport的park()和unpark()操作来控制线程的运行状态的。
AbstractQueuedSynchronizer内部数据结构
好了,基础的部分就介绍到这里。下面,让我们切入正题:首先来看一下AbstractQueuedSynchronizer的内部数据结构。
在AbstractQueuedSynchronizer内部,有一个队列,我们把它叫做同步等待队列。它的作用是保存等待在这个锁上的线程(由于lock()操作引起的等待)。此外,为了维护等待在条件变量上的等待线程,AbstractQueuedSynchronizer又需要再维护一个条件变量等待队列,也就是那些由Condition.await()引起阻塞的线程。
由于一个重入锁可以生成多个条件变量对象,因此,一个重入锁就可能有多个条件变量等待队列。实际上,每个条件变量对象内部都维护了一个等待列表。其逻辑结构如下所示:
下面的类图展示了代码层面的具体实现:
可以看到,无论是同步等待队列,还是条件变量等待队列,都使用同一个Node类作为链表的节点。对于同步等待队列,Node中包括链表的上一个元素prev,下一个元素next和线程对象thread。对于条件变量等待队列,还使用nextWaiter表示下一个等待在条件变量队列中的节点。
Node节点另外一个重要的成员是waitStatus,它表示节点等待在队列中的状态:
- CANCELLED:表示线程取消了等待。如果取得锁的过程中发生了一些异常,则可能出现取消的情况,比如等待过程中出现了中断异常或者出现了timeout。
- SIGNAL:表示后续节点需要被唤醒。
- CONDITION:线程等待在条件变量队列中。
- PROPAGATE:在共享模式下,无条件传播releaseShared状态。早期的JDK并没有这个状态,咋看之下,这个状态是多余的。引入这个状态是为了解决共享锁并发释放引起线程挂起的bug 6801020。(随着JDK的不断完善,它的代码也越来越难懂了 :(,就和我们自己的工程代码一样,bug修多了,细节就显得越来越晦涩)
- 0:初始状态
其中CANCELLED=1,SIGNAL=-1,CONDITION=-2,PROPAGATE=-3 。在具体的实现中,就可以简单的通过waitStatus释放小于等于0,来判断是否是CANCELLED状态。
排他锁
了解了AbstractQueuedSynchronizer的基本实现思路和数据结构,接下来一起看一下它的实现细节吧。首先,来看一下排他锁的实现。重入锁是一种 典型的排他锁。
请求锁
下面是排他锁获得请求许可的代码:
public final void acquire(int arg) {
//尝试获得许可, arg为许可的个数。对于重入锁来说,每次请求1个。
if (!tryAcquire(arg) &&
// 如果tryAcquire 失败,则先使用addWaiter()将当前线程加入同步等待队列
// 然后继续尝试获得锁
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
进入一步看一下tryAcquire()函数。该函数的作用是尝试获得一个许可。对于AbstractQueuedSynchronizer来说,这是一个未实现的抽象函数。
具体实现在子类中。在重入锁,读写锁,信号量等实现中, 都有各自的实现。
如果tryAcquire()成功,则acquire()直接返回成功。如果失败,就用addWaiter()将当前线程加入同步等待队列。
接着, 对已经在队列中的线程请求锁,使用acquireQueued()函数,从函数名字上可以看到,其参数node,必须是一个已经在队列中等待的节点。它的功能就是为已经在队列中的Node请求一个许可。
这个函数大家要好好看看,因为无论是普通的lock()方法,还是条件变量的await()都会使用这个方法。
条件变量等待
如果调用Condition.await(),那么线程也会进入等待,下面来看实现:
Condition对象的signal()通知
signal()通知的时候,是在条件等待队列中,按照FIFO进行,首先从第一个节点下手:
release()释放锁
释放排他锁很简单
public final boolean release(int arg) {
//tryRelease()是一个抽象方法,在子类中有具体实现和tryAcquire()一样
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 从队列中唤醒一个等待中的线程(遇到CANCEL的直接跳过)
unparkSuccessor(h);
return true;
}
return false;
}
共享锁
与排他锁相比,共享锁的实现略微复杂一点。这也很好理解。因为排他锁的场景很简单,单进单出,而共享锁就不一样了。可能是N进M出,处理起来要麻烦一些。但是,他们的核心思想还是一致的。共享锁的几个典型应用有:信号量,读写锁中的写锁。
获得共享锁
为了实现共享锁,在AbstractQueuedSynchronizer中,专门有一套针对共享锁的方法。
获得共享锁使用acquireShared()方法:
释放共享锁
释放共享锁的代码如下:
public final boolean releaseShared(int arg) {
//tryReleaseShared()尝试释放许可,这是一个抽象方法,需要在子类中实现
if (tryReleaseShared(arg)) {
//上述代码中已经出现这个函数了,就是唤醒线程,设置传播状态
doReleaseShared();
return true;
}
return false;
}
写在最后的话
AbstractQueuedSynchronizer 是一个比较复杂的实现,要完全理解其中的细节还需要慢慢琢磨。
这篇文章也只能起到一个抛砖引玉的作用,将AbstractQueuedSynchronizer的设计思想,核心数据结构已经核心实现代码展示给大家。希望对大家理解AbstractQueuedSynchronizer的实现,以及理解重入锁,信号量,读写锁有一定帮助。