AQS(AbstractQueuedSynchronizer)
学习AQS的必要性
队列同步器AbstractQueuedSynchronizer
(以下简称同步器或AQS),是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。并发包的大师(Doug Lea)期望它能够成为实现大部分同步需求的基础。
AQS使用方式和其中的设计模式
AQS的主要使用方式是继承,子类通过继承AQS并实现它的抽象方法来管理同步状态,在AQS里由一个int型的state来代表这个状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的3个方法(getState()
、setState(int newState)
和compareAndSetState(int expect,int update)
)来进行操作,因为它们能够保证状态的改变是安全的。
在实现上,子类推荐被定义为自定义同步组件的静态内部类,AQS自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态,这样就可以方便实现不同类型的同步组件(ReentrantLock
、ReentrantReadWriteLock
和CountDownLatch
等)。
同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器。可以这样理解二者之间的关系:
锁是面向使用者的,它定义了使用者与锁交互的接口(比如可以允许两个线程并行访问),隐藏了实现细节;
同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁和同步器很好地隔离了使用者和实现者所需关注的领域。
实现者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。
模板方法模式
同步器的设计基于模板方法模式。模板方法模式的意图是,定义一个操作中的算法的骨架,而将一些步骤的实现延迟到子类中。模板方法使得子类可以不改变一个算法的结构即可重定义该算法的某些特定步骤。
实际例子
我们开了个蛋糕店,蛋糕店不能只卖一种蛋糕呀,于是我们决定先卖奶油蛋糕,芝士蛋糕和慕斯蛋糕。三种蛋糕在制作方式上一样,都包括造型,烘焙和涂抹蛋糕上的东西。所以可以定义一个抽象蛋糕模型
public abstract class AbstractCake {
protected abstract void shape();/*造型*/
protected abstract void apply();/*涂抹*/
protected abstract void brake();/*烤面包*/
/*做个蛋糕 */
public final void run(){
this.shape();
this.apply();
this.brake();
}
protected boolean shouldApply(){
return true;
}
}
然后就可以批量生产三种蛋糕
public class CheeseCake extends AbstractCake{
@Override
protected void shape() {
System.out.println("芝士蛋糕造型");
}
@Override
protected void apply() {
System.out.println("芝士蛋糕涂抹 芝士");
}
@Override
protected void brake() {
System.out.println("芝士蛋糕烘焙 5 min");
}
}
public class CreamCake extends AbstractCake {}
public class MouseCake extends AbstractCake {}
public class MakeCake {
public static void main(String[] args) {
AbstractCake cake = new CheeseCake();
// AbstractCake cake = new CreamCake();
// AbstractCake cake = new MouseCake();
// cake2.run();
}
}
这样一来,不但可以批量生产三种蛋糕,而且如果日后有扩展,只需要继承抽象蛋糕方法就可以了,十分方便,我们天天生意做得越来越赚钱。突然有一天,我们发现市面有一种最简单的小蛋糕销量很好,这种蛋糕就是简单烘烤成型就可以卖,并不需要涂抹什么食材,由于制作简单销售量大,这个品种也很赚钱,于是我们也想要生产这种蛋糕。但是我们发现了一个问题,抽象蛋糕是定义了抽象的涂抹方法的,也就是说扩展的这种蛋糕是必须要实现涂抹方法,这就很鸡儿蛋疼了。怎么办?我们可以将原来的模板修改为带钩子的模板。
/*做个蛋糕 */
public final void run(){
this.shape();
if(this.shouldApply()) {
this.apply();
}
this.brake();
}
protected boolean shouldApply(){
return true;
}
做小蛋糕的时候通过flag来控制是否涂抹,其余已有的蛋糕制作不需要任何修改可以照常进行。
public class SmallCake extends AbstractCake{
@Override
protected boolean shouldApply() {
return false;
}
@Override
protected void shape() {
System.out.println("小蛋糕造型");
}
@Override
protected void apply() {}
@Override
protected void brake() {
System.out.println("小蛋糕烘焙");
}
}
AQS中的方法
模板方法
实现自定义同步组件时,将会调用同步器提供的模板方法,
方法名称 | 描述 |
---|---|
void acquire(int arg) |
独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进人同步队列等待,该方法将会调用重写的tryAcquire(intarg) 方法 |
void acquireInterruptibly(int arg) |
与acquire(int arg) 相同.但是该方法响应中断.当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException 并返回 |
boolean tryAcquireNanos(int arg, long nanosTimeout) |
在acquircInterruptibly(int arg) 基础上增加了超时限制,如果当前线程在超时时间内没有获取到同步状态,那么将会返回false.如果获取到了返回true
|
void acquireShared(int arg) |
共享式的获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式获取的主要区别是在同一时刻可以有多个线程获取到同步状态 |
void acquireSharedInterruptibly(int arg) |
与acquireShared(int arg) 相同,该方法响应中断 |
boolean tryAcquireSharedNanos(int arg, long nanosTimeout) |
在acquireSharedInterruptibly(int arg) 基础上增加了超时限制 |
boolean release(int arg) |
独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒 |
boolean releaseShared(int arg) |
共享式的释放同步状态 |
Collection<Thread> getQueuedThreads() |
获取等待在同步队列上的线程集合 |
这些模板方法同步器提供的模板方法基本上分为3类:独占式获取与释放同步状态、共享式获取与释放、同步状态和查询同步队列中的等待线程情况。
可重写的方法
方法名称 | 描述 |
---|---|
protected boolean tryAcquire(int arg) |
独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态 |
protected boolean tryRelease(int arg) |
独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态 |
protected int tryAcquireShared(int arg) |
共享式获取同步状态,返回大于等于0的值,表示获取成功,反之,获取失败 |
protected boolean tryReleaseShared(int arg) |
共享式释放同步状态 |
protected boolean isHeldExclusively() |
当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占被当前线程所独占 |
访问或修改同步状态的方法
重写同步器指定的方法时,需要使用同步器提供的如下3个方法来访问或修改同步状态。
-
getState()
:获取当前同步状态。 -
setState(int newState)
:设置当前同步状态。 -
compareAndSetState(int expect,int update) update)
:使用CAS设置当前状态,该方法能够保证状态设置的原子性。
实现独占锁
自己实现一个同步工具类:独占锁
如果我们要实现一个独占锁,那么就需要实现Lock
接口,(所有显示锁都是实现Locak
接口),Lock
接口里面有好几个方法是现需要我们实现的,对于我们现在要实现的独占锁,则lock()
和unlock()
这两个是一定要实现的,那么这两个方法又具体如何实现呢?这个时候就需要借用AQS了,而AQS在实现上它是一个模板方法模式,而且在同步工具类的内部即内部类继承一个AQS。因此这里定义一个内部类Sync
继承AbstractQueuedSynchronizer
。
/**
* 实现我们自己独占锁,不可重入
*/
public class SelfLock implements Lock {
// 静态内部类,自定义同步器
private static class Sync extends AbstractQueuedSynchronizer {
/*判断处于占用状态*/
@Override
protected boolean isHeldExclusively() {
return getState()==1;
}
/*获得锁*/
@Override
protected boolean tryAcquire(int arg) {
if(compareAndSetState(0,1)){//由0改为1
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/*释放锁*/
@Override
protected boolean tryRelease(int arg) {
if(getState()==0){
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
/* 返回一个Condition,每个condition都包含了一个condition队列*/
Condition newCondition() {
return new ConditionObject();
}
}
/* 仅需要将操作代理到Sync上即可*/
private final Sync sync = new Sync();
public void lock() {
System.out.println(Thread.currentThread().getName()+" ready get lock");
sync.acquire(1);
System.out.println(Thread.currentThread().getName()+" already got lock");
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
public void unlock() {
System.out.println(Thread.currentThread().getName()+" ready release lock");
sync.release(1);
System.out.println(Thread.currentThread().getName()+" already released lock");
}
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
这里我们可以发现在lock()
方法里调用的是acquire
方法,而定义的Sync
类里实现的是tryRelease
方法,看着这两个没有什么关系呀?其实我们可以到acquire
方法里面看看,可以看到它里面调用的就是tryAcquire
方法
CLH队列锁
CLH队列锁即Craig, Landin, and Hagersten (CLH) locks。
CLH队列锁也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程仅仅在本地变量上自旋,它不断轮询前驱的状态,假设发现前驱释放了锁就结束自旋。
当一个线程需要获取锁时:
- 创建一个的QNode,将其中的locked设置为true表示需要获取锁,myPred表示对其前驱结点的引用
-
线程A对tail域调用getAndSet方法,使自己成为队列的尾部,同时获取一个指向其前驱结点的引用myPred
线程B需要获得锁,同样的流程再来一遍
- 线程就在前驱结点的locked字段上旋转,直到前驱结点释放锁(前驱节点的锁值 locked == false)
-
当一个线程需要释放锁时,将当前结点的locked域设置为false,同时回收前驱结点
如上图所示,前驱结点释放锁,线程A的myPred所指向的前驱结点的locked字段变为false,线程A就可以获取到锁。
CLH队列锁的优点是空间复杂度低(如果有n个线程,L个锁,每个线程每次只获取一个锁,那么需要的存储空间是O(L+n),n个线程有n个myNode,L个锁有L个tail)。CLH队列锁常用在SMP体系结构下。
Java中的AQS是CLH队列锁的一种变体实现。
ReentrantLock的实现
锁的可重入
重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁所阻塞,该特性的实现需要解决以下两个问题。
- 线程再次获取锁。锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取。
- 锁的最终释放。线程重复n次获取了锁,随后在第n次释放该锁后,其他线程能够获取到该锁。锁的最终释放要求锁对于获取进行计数自增,计数表示当前锁被重复获取的次数,而锁被释放时,计数自减,当计数等于0时表示锁已经成功释放。
nonfairTryAcquire
方法增加了再次获取同步状态的处理逻辑:通过判断当前线程是否为获取锁的线程来决定获取操作是否成功,如果是获取锁的线程再次请求,则将同步状态值进行增加并返回true,表示获取同步状态成功。同步状态表示锁被一个线程重复获取的次数。
如果该锁被获取了n次,那么前(n-1)次tryRelease(int releases)
方法必须返回false,而只有同步状态完全释放了,才能返回true。可以看到,该方法将同步状态是否为0作为最终释放的条件,当同步状态为0时,将占有线程设置为null,并返回true,表示释放成功。
独占锁
我们可以测试上面自定义的独占锁SelfLock
。下面代码中自己声明一个独占锁SelfLock
实例,然后在线程里打印当前线程名字,启动四个子线程分别执行,可以发现四个线程依次打印自己的名字
public class TestMyLock {
public void test() {
final Lock lock = new SelfLock();
class Worker extends Thread {
public void run() {
lock.lock();
System.out.println(Thread.currentThread().getName());
try {
SleepTools.second(1);
} finally {
lock.unlock();
}
}
}
// 启动4个子线程
for (int i = 0; i < 4; i++) {
Worker w = new Worker();
//w.setDaemon(true);
w.start();
}
// 主线程每隔1秒换行
for (int i = 0; i < 10; i++) {
SleepTools.second(1);
//System.out.println();
}
}
public static void main(String[] args) {
TestMyLock testMyLock = new TestMyLock();
testMyLock.test();
}
}
独占锁有个问题就是它不可重入,就会出现死锁问题,看下面代码
public class TestReenterSelfLock {
static final Lock lock = new ReenterSelfLock();
public void reenter(int x){
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+":递归层级:"+x);
int y = x - 1;
if (y==0) return;
else{
reenter(y);
}
} finally {
lock.unlock();
}
}
public void test() {
class Worker extends Thread {
public void run() {
System.out.println(Thread.currentThread().getName());
SleepTools.second(1);
reenter(3);
}
}
// 启动3个子线程
for (int i = 0; i < 3; i++) {
Worker w = new Worker();
w.start();
}
// 主线程每隔1秒换行
for (int i = 0; i < 100; i++) {
SleepTools.second(1);
}
}
public static void main(String[] args) {
TestReenterSelfLock testMyLock = new TestReenterSelfLock();
testMyLock.test();
}
}
我们看Sync
类的tryAcquire
方法实现调用compareAndSetState(0,1)
方法,第一次调用的时候把状态0改为1,然后调用setExclusiveOwnerThread
方法,把自己设置进去拿到了这把锁,当第二次拿锁的时候还是调用compareAndSetState(0,1)
方法,这个时候就会导致失败,于是就等着释放锁,就会产生死锁。
现在想实现锁的可重入,只需要修改一下tryAcquire
方法,看下面代码:
在tryAcquire
方法里获取锁的时候,多做一下判定,如果当前已经拿到了锁,判定一下当前拿锁的线程是不是当前线程(getExclusiveOwnerThread()==Thread.currentThread()
),是当前线程就把State + 1
,这里加一是表示当前线程又拿了一次锁,与此同时相对于的tryRelease
方法,把State - 1
,当State == 0
的时候表示当前线程对这个锁的持有没有了。通过这种方法实现了锁的可重入
/**
* 实现我们自己独占锁,可重入
*/
public class ReenterSelfLock implements Lock {
/* 静态内部类,自定义同步器*/
private static class Sync extends AbstractQueuedSynchronizer {
/* 是否处于占用状态*/
protected boolean isHeldExclusively() {
return getState() > 0;
}
/* 当状态为0的时候获取锁*/
public boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}else if(getExclusiveOwnerThread()==Thread.currentThread()){
setState(getState()+1);
return true;
}
return false;
}
/* 释放锁,将状态设置为0*/
protected boolean tryRelease(int releases) {
if(getExclusiveOwnerThread()!=Thread.currentThread()){
throw new IllegalMonitorStateException();
}
if (getState() == 0)
throw new IllegalMonitorStateException();
setState(getState()-1);
if(getState()==0){
setExclusiveOwnerThread(null);
}
return true;
}
/* 返回一个Condition,每个condition都包含了一个condition队列*/
Condition newCondition() {
return new ConditionObject();
}
}
/* 仅需要将操作代理到Sync上即可*/
private final Sync sync = new Sync();
public void lock() {
System.out.println(Thread.currentThread().getName()+" ready get lock");
sync.acquire(1);
System.out.println(Thread.currentThread().getName()+" already got lock");
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
public void unlock() {
System.out.println(Thread.currentThread().getName()+" ready release lock");
sync.release(1);
System.out.println(Thread.currentThread().getName()+" already released lock");
}
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
测试自己是是实现的可重入锁
public class TestReenterSelfLock {
static final Lock lock = new ReenterSelfLock();
public void reenter(int x){
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+":递归层级:"+x);
int y = x - 1;
if (y==0) return;
else{
reenter(y);
}
} finally {
lock.unlock();
}
}
public void test() {
class Worker extends Thread {
public void run() {
System.out.println(Thread.currentThread().getName());
SleepTools.second(1);
reenter(3);
}
}
// 启动3个子线程
for (int i = 0; i < 3; i++) {
Worker w = new Worker();
w.start();
}
// 主线程每隔1秒换行
for (int i = 0; i < 100; i++) {
SleepTools.second(1);
}
}
public static void main(String[] args) {
TestReenterSelfLock testMyLock = new TestReenterSelfLock();
testMyLock.test();
}
}
公平锁和非公平锁
ReentrantLock
的构造函数中,默认的无参构造函数将会把Sync对象创建为NonfairSync
对象,这是一个“非公平锁”;而另一个构造函数ReentrantLock(boolean fair)
传入参数为true时将会把Sync对象创建为“公平锁”FairSync
。
公平锁:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
非公平锁:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
nonfairTryAcquire(int acquires)
方法,对于非公平锁,只要CAS设置同步状态成功,则表示当前线程获取了锁,而公平锁则不同。tryAcquire
方法,该方法与nonfairTryAcquire(int acquires)
比较,唯一不同的位置为判断条件多了hasQueuedPredecessors()
方法,即加入了同步队列中当前节点是否有前驱节点的判断,如果该方法返回true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁。