Java并发编程:用AQS写一把可重入锁

AQS简介

AQS是J.U.C包下AbstractQueuedSynchronizer抽象的队列式的同步器的简称,这是一个抽象类,它定义了一套多线程访问共享资源的同步器框架,J.U.C包下的许多同步类实现都依赖于它,比如ReentrantLock/Semaphore/CountDownLatch,可以说这个抽象类是J.U.C并发包的基础。

之所以把这一章节叫做AQS简介而不是叫AQS详解,是因为已经有大神写过详解的文章Java并发之AQS详解,这篇文章对AQS的源码解析很透彻,博主读了之后受益匪浅,鉴于对原作者的尊重,所以如上附上原文的链接。要想弄懂AQS还得从这一图说起。

如上图所述,AQS维护了一个state变量和一个FIFO先进先出队列,这个state用来干嘛的可以参考我前一篇博客中的那个count计数器,就是用来计数线程的重入次数的。上一篇博客还用了一个变量currentThread来记录已经获得这把锁的线程。而我们的AQS用的是一个先进先出的等待队列的完成这件事。当新的线程进来的时候,AQS调用tryAquice()方法试图去获得锁,如果获得的话,则调用interupt中断方法;如果没有获得锁,则把当前线程放入排队的队列,AQS队列不断的自旋尝试去判断已经占用的线程是否已经放开,如果锁依然被线程继续占用,则继续添加进等待队列。

源码如下:

public final void acquire(int arg) {

if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

那个addWaiter方法,此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点。

private Node addWaiter(Node mode) {

//以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)

Node node = new Node(Thread.currentThread(), mode);

//尝试快速方式直接放到队尾。

Node pred = tail;

if (pred != null) {

node.prev = pred;

if (compareAndSetTail(pred, node)) {

pred.next = node;

return node;

}

}

//上一步失败则通过enq入队。

enq(node);

return node;

}

我们以独占式的同步帮助器为例来看一下AQS的执行流程。

大致流程如下:

调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;

没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;

acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。

如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

上述的流程和步骤已经是AQS帮我们实现了的功能,估计我讲的也不太清楚,这里再次推荐读者阅读这篇文章Java并发之AQS详解,下面我们应该来看看如何使用AQS。

用AQS写一把互斥锁

互斥锁是为了保证数据的安全,在任一时刻只能有一个线程访问该对象。由上一个小节我们可知,AQS已经为我们实现所有排队和阻塞机制,我们只需要调用getState()、setState(int) 和 compareAndSetState(int, int) 方发来维护state变量的数值和调用setExclusiveOwnerThread/getExclusiveOwnerThread来维护当前占用的线程是谁就行了。翻越JDK提供的API,它建议我们:应该将子类定义为非公共内部帮助器类,可用它们来实现其封闭类的同步属性。类 AbstractQueuedSynchronizer 没有实现任何同步接口。而是定义了诸如 acquireInterruptibly(int) 之类的一些方法,在适当的时候可以通过具体的锁和相关同步器来调用它们,以实现其公共方法。

什么意思呢?意思就是建议我们:如果你想要使用AQS实现一把互斥锁Mutex,就必须先用一个类去继承AbstractQueuedSynchronizer这个抽象类,然而这个实现的子类(暂取名叫Sync)应该是作为Mutex的内部类来用的,提供给Mutex当作帮助器来使用。那么Lock接口,Mutex互斥锁,AbstractQueuedSynchronizer抽象类和Sync帮助器这四者存在什么联系呢?为了避免你听糊涂了,下面我整理他们的UML类图如下。

由上图可知:Mutex互斥锁继承了Lock锁的接口,具有锁的属性,可以提供上锁和释放锁的方法,他是对外提供服务的服务者,而Mutex类有个Sync类型的私有对象sync,这个私有对象继承了AbstractQueuedSynchronizer抽象类,是Mutex锁和AQS的桥梁,是加锁和释放锁真正的服务者。如果你看明白了上面的UML类图,那么我们的Mutex互斥锁的定义应该如下:

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

public class Mutex implements Lock {

private Sync sync = new Sync();

private class Sync extends AbstractQueuedSynchronizer {

@Override

protected boolean tryAcquire(int arg) {

// TODO Auto-generated method stub

return super.tryAcquire(arg);

}

@Override

protected boolean tryRelease(int arg) {

// TODO Auto-generated method stub

return super.tryRelease(arg);

}

}

@Override

public void lock() {

// TODO Auto-generated method stub

}

@Override

public void lockInterruptibly() throws InterruptedException {

// TODO Auto-generated method stub

}

@Override

public boolean tryLock() {

// TODO Auto-generated method stub

return false;

}

@Override

public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {

// TODO Auto-generated method stub

return false;

}

@Override

public void unlock() {

// TODO Auto-generated method stub

}

@Override

public Condition newCondition() {

// TODO Auto-generated method stub

return null;

}

}

 这里我们实现的是独占式的锁,Sync帮助器只需要覆盖父类的tryAcquire(),tryRelease()方法就行了,其他方法可以暂时删掉,如共享式的tryAcquireShared(),tryReleaseShared(),已经Condition用到的isHeldExclusively()和toString()方法都可以暂时不用实现,因为我们只是想先用AQS来做一把可以保证数据安全的锁,考虑的问题暂时没有那么多。

/**

* 互斥锁

* @author 张仕宗

* @date 2018.11.9

*/

public class Mutex implements Lock{

//AQS子类的对象,Mutex互斥锁用它来工作

private Sync sync = new Sync();

//Sync同步器类作为公共内部帮助器,可用它来实现其封闭类的同步属性

private class Sync extends AbstractQueuedSynchronizer {

@Override

protected boolean tryAcquire(int arg) {

assert arg == 1; //这里用到了断言,互斥锁,锁只能被获取一次,如果arg不等于1,则直接中断

if(this.compareAndSetState(0, 1)) { //这里做一下判断,如果state的值为等于0,立马将state设置为1

//返回true,告诉acqure方法,获取锁成功

return true;

}

return false;

}

@Override

protected boolean tryRelease(int arg) {

//释放锁,由于这是一把互斥锁,state不是0就是1,所以你需要做两步:

//1.直接将state置为0

this.setState(0);

//返回true,告诉aqs的release方法释放锁成功

return true;

}

}

/**

* 上锁的方法

*/

@Override

public void lock() {

sync.acquire(1);

}

/**

* 释放锁的方法

*/

@Override

public void unlock() {

sync.release(1);

}

@Override

public void lockInterruptibly() throws InterruptedException {

// TODO Auto-generated method stub

}

@Override

public boolean tryLock() {

// TODO Auto-generated method stub

return false;

}

@Override

public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {

// TODO Auto-generated method stub

return false;

}

@Override

public Condition newCondition() {

// TODO Auto-generated method stub

return null;

}

}

 上诉代码实现了一把最简单的锁,我们只实现其lock()和unlock()方法,其他方法请暂时忽略,而lock()方法和unlock()方法是如何实现的呢?lock()方法调用了Sync帮助器对象的sync.acquire(1)方法,由于我们的帮助器Sync并没有实现这个方法,所以实际调用的是AQS的acquire()方法,而AQS这时候做了什么时呢?再来一次该方法的源码:

if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

 次方法干的第一件事就是去调用tryAcquire()方法,这个方法需要Sync来实现,如果自己的Sync没有实现这个方法的话,父类会直接抛出UnsupportedOperationException这个异常。

@Override

protected boolean tryAcquire(int arg) {

assert arg == 1; //这里用到了断言,互斥锁,锁只能被获取一次,如果arg不等于1,则直接中断

if(this.compareAndSetState(0, 1)) { //这里做一下判断,如果state的值为等于0,立马将state设置为1

//返回true,告诉acqure方法,获取锁成功

return true;

}

return false;

}

由于这是一把互斥锁,所以只能有同一时刻只能获得一次锁。代码中用到了assert断言,如果预获得锁的次数不是1,则中断。接下来if中判断state状态是否为0,如果state状态为0,则说明锁还没有被占用,那么我立刻占用这把锁,判断state当前值和设置state为1这两步用原子性操作的代码语句是this.compareAndSetState(0, 1),并立马放回true,这时候AQS获得返回值,获得锁成功。如果是第二个线程进来,if语句判断得到的值非0,则直接返回false,这时候AQS将新进来的线程放进FIFO队列排队。

接下来看看Mutex的unlock()方法,该方法调用了sync.release(1),看看AQS这时候做了什么!

public final boolean release(int arg) {

if (tryRelease(arg)) {

Node h = head;

if (h != null && h.waitStatus != 0)

unparkSuccessor(h);

return true;

}

return false;

}

 此方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。同样的,我们的同步器Sync需要去实现这个tryRelease方法,不然同样会抛出UnsupportedOperationException异常。Sync的tryRelease方法比较简单:

@Override

protected boolean tryRelease(int arg) {

//释放锁,由于这是一把互斥锁,state不是0就是1,所以你需要做两步:

//1.直接将state置为0

this.setState(0);

//返回true,告诉aqs的release方法释放锁成功

return true;

}

 只需要设置state为0即可,由于这是一把互斥锁,state不是0就是1所以直接调用this.setSate(0)。

用AQS写一把重入锁

上诉的Mutex并非一把可重入锁,为了实现这把锁能够让同一线程多次进来,回忆一下上一篇博客中怎么实现的?当时的做法是在锁的lock()自旋方法中判断新进来的是不是正在运行的线程,如果新进来的线程就是正在运行的线程,则获取锁成功,并让计数器+1。而在释放锁的时候,如果释放锁的线程等于当前线程,让计数器-1,只有当计数器count归零的时候才真正的释放锁。同样的,用AQS实现的锁也是这个思路,那么我们的tryAcquice方法如下:

@Override

protected boolean tryAcquire(int arg) {

//如果第一个线程进来,直接获得锁,并设置当前独占的线程为当前线程

int state = this.getState();

if(state == 0) { //state为0,说明当前没有线程占用该线程

if(this.compareAndSetState(0, arg)) { //判断当前state值,第一个线程进来,立刻设置state为arg

this.setExclusiveOwnerThread(Thread.currentThread()); //设置当前独占线程为当前线程

return true; //告诉顶级aqs获取锁成功

}

} else { //如果是第二个线程进来

Thread currentThread = Thread.currentThread();//当前进来的线程

Thread ownerThread = this.getExclusiveOwnerThread();//已经保存进去的独占式线程

if(currentThread == ownerThread) { //判断一下进来的线程和保存进去的线程是同一线程么?如果是,则获取锁成功,如果不是则获取锁失败

this.setState(state+arg); //设置state状态

return true;

}

}

return false;

}

 tryAcquice()方法代码含义如注释所示,与Mutex互斥锁不同的是当state状态不为0时我们的逻辑处理,如果第二次进来的线程currentThread和正在独占的线程ownerThread为统一线程,第一步设置state增加1,第二步返回true给AQS。

tryRelease()方法代码如下:

@Override

protected boolean tryRelease(int arg) {

//锁的获取和锁的释放是一一对应的,获取过多少次锁就释放多少次锁

if(Thread.currentThread() != this.getExclusiveOwnerThread()) {

//如果释放锁的不是当前线程,则抛出异常

throw new RuntimeException();

}

int state = this.getState()-arg;

//接下来判断state是否已经归零,只有state归零的时候才真正的释放锁

if(state == 0) {

//state已经归零,做扫尾工作

this.setState(0);

this.setExclusiveOwnerThread(null);

return true;

}

this.setState(state);

return false;

}

 tryRelease()首先是获取当前state的值,并对这个值进行欲判:如果当前值state减去sync.release()传来的参数归零,则真正的释放锁,那么我们要做的第一步是设置state为0,接着设置当前独占的线程为null,再然后返回true告诉AQS释放锁成功。如果如果当前值state减去sync.release()传来的参数归零,如果让state的值为state-arg相减之后的值。

目前为此,我们以来了AQS框架来改写的重入锁代码如下:

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

/**

* 用AQS实现的重入锁

* @author 张仕宗

* @date 2018.11.9

*/

public class MyAqsLock implements Lock{

//AQS子类的对象,用它来辅助MyAqsLock工作

private Sync sync = new Sync();

private class Sync extends AbstractQueuedSynchronizer {

@Override

protected boolean tryAcquire(int arg) {

//如果第一个线程进来,直接获得锁,并设置当前独占的线程为当前线程

int state = this.getState();

if(state == 0) { //state为0,说明当前没有线程占用该线程

if(this.compareAndSetState(0, arg)) { //判断当前state值,第一个线程进来,立刻设置state为arg

this.setExclusiveOwnerThread(Thread.currentThread()); //设置当前独占线程为当前线程

return true; //告诉顶级aqs获取锁成功

}

} else { //如果是第二个线程进来

Thread currentThread = Thread.currentThread();//当前进来的线程

Thread ownerThread = this.getExclusiveOwnerThread();//已经保存进去的独占式线程

if(currentThread == ownerThread) { //判断一下进来的线程和保存进去的线程是同一线程么?如果是,则获取锁成功,如果不是则获取锁失败

this.setState(state+arg); //设置state状态

return true;

}

}

return false;

}

@Override

protected boolean tryRelease(int arg) {

//锁的获取和锁的释放是一一对应的,获取过多少次锁就释放多少次锁

if(Thread.currentThread() != this.getExclusiveOwnerThread()) {

//如果释放锁的不是当前线程,则抛出异常

throw new RuntimeException();

}

int state = this.getState()-arg;

//接下来判断state是否已经归零,只有state归零的时候才真正的释放锁

if(state == 0) {

//state已经归零,做扫尾工作

this.setState(0);

this.setExclusiveOwnerThread(null);

return true;

}

this.setState(state);

return false;

}

public Condition newCondition() {

return new ConditionObject();

}

}

/**

* 上锁的方法

*/

@Override

public void lock() {

sync.acquire(1);

}

/**

* 释放锁的方法

*/

@Override

public void unlock() {

sync.release(1);

}

@Override

public void lockInterruptibly() throws InterruptedException {

sync.acquireInterruptibly(1);

}

@Override

public boolean tryLock() {

//调用帮助器的tryAcquire方法,测试获取锁一次,不会自旋

return sync.tryAcquire(1);

}

@Override

public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {

//调用帮助器的tryRelease方法,测试释放锁一次,不会子旋

return sync.tryRelease(1);

}

@Override

public Condition newCondition() {

//调用帮助类获取Condition对象

return sync.newCondition();

}

}

以下大纲所有的内容群内已经将知识体系整理好(源码,笔记,PPT,学习视频)进群免费领取。

加QQ群:897889510,免费领取资料

以上大纲所有的内容群内已经将知识体系整理好(源码,笔记,PPT,学习视频)进群免费领取。加QQ群:897889510,免费领取资料

以上大纲所有的内容群内已经将知识体系整理好(源码,笔记,PPT,学习视频)进群免费领取。加QQ群:897889510,免费领取资料

以上大纲所有的内容群内已经将知识体系整理好(源码,笔记,PPT,学习视频)进群免费领取。加QQ群:897889510,免费领取资料

以上大纲所有的内容群内已经将知识体系整理好(源码,笔记,PPT,学习视频)进群免费领取。加QQ群:897889510,免费领取资料

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容