Java concurrent包源码走读(二)

简介

AQS(AbstractQueuedSynchronizer)是Java并发工具基础,要掌握Java并发工具类首先得熟悉AQS,通过对AQS的学习,我们将进一部理解共享锁、独占锁、公平锁、非公平锁、重入锁等常见锁。AQS作为一个模板方法,它定义并发工具类处理流程,接下来让我们来了解下AQS的处理流程。

AQS接口

AQS既然是基于模板方法实现,那它即提供了模板方法同时也提供可重新的方法。

可重写的方法

方法名称 描述
protected boolean tryAcquire(int arg) 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态
protected boolean tryRelease(int arg) 独占式释放同步状态,等待后去同步状态的线程将有机会获取同步状态
protected boolean tryAcquireShare(int arg) 共享式获取同步状态,返回大于等于0的值则成功,反之,获取失败
protected boolean tryReleaseShare(int arg) 共享式释放同步状态
protected boolean isHeldExclusively() 当前同步器是否被当前线程独占

模板方法

方法名称 描述
void acquire(int arg) 独占式获取同步状态,当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,将会调用tryAcquire(int arg)方法
void acquireInterruptibly(int arg) 与acquire(int arg),但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则该线程会抛出InterrupteredException并返回
protected boolean tryAcquireNanos(int arg,long nanos) 在acquireInterruptibly基础上新增超时时间,如果当前线程在超时时间内没有获取到同步状态,返回false,如果获取到返回true
protected boolean acquireShared(int arg) 共享式获取同步状态,与独占的区别是在同一时刻可以友有多个线程获取同步状态
protected boolean tryAcquireInterruptibly(int arg) 与acquireShared(int arg)相同,该方法响应中断
protected boolean tryAcquireSharedNanos(int arg,long nanos) 在tryAcquireInterruptibly(int arg)方法上新增超时限制
boolean release(int arg) 独占式的释放同步状态,该方法会在释放同步状态后 ,将同步队列中第一个节点包含的线程唤醒
boolean releaseShared(int arg) 同步式的释放同步状态
Collection<Tread> getQueuedThreads() 获取等待在同步队列上的线程集合

AQS基本结构

AQS依赖内部的同步队列来完成同步状态的管理,同步队列中的节点(Node)用来保存"获取同步状态失败的线程"引用、等待状态以及前驱和后继节点。节点的属性类型与名称及描述:

属性类型和名称 描述
int waitStatus 等待状态:(1)CANCELLED,值为1,由于在同步队列中的线程等待超时或者被中断,需要从同步队列中取消等待 (2)SIGNAL,值为-1,后继节点的线程处于等待状态,而当前节点的线程如果或者释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行 (3)CONDITION,值是-2,节点在等待队列中,节点线程在Condition上,其他线程对Condition调用了signal()方法后,该节点将会从等待队列中转移到同步队列中,加入到该同步状态的获取中 (4)PROPAGATE,值为-3,表示下一次共享式同步状态获取将会无条件地传播下去 (5)INITIAL,值为0,初始状态
Node prev 前驱节点
Node next 后继节点
Node nextWaiter 等待队列的后继节点。如果当前节点是共享的,那么这个字段将是一个SHARE常量,也就是说节点类型和等待队列中的后继节点共用同一个字段
Thread thread 获取同步状态的线程

根据Node基本属性可得下图:AQS基于一个双向链表实现的同步队列,同时有个基于单向链表实现的条件队列,注意这个条件队列不是必须的,它也可以有多个。

源码走读

独占式获取锁

首先我们看一个流程图,熟悉其大致流程

源码

//acquire(int arg) 
public final void acquire(int arg) {
        //首先调用tryAcquire(arg)方法,这个具体在子类中实现,通过这个方法可以实现公平锁和不公平锁,如果这个方法获取锁成功则直接返回,不再执行后面的代码
        //获取失败后便执行addWaiter(Node.EXCLUSIVE), arg)方法,将其添加到队列中,同时标记为独占锁
        //acquireQueued(final Node node, int arg)方法中首先尝试获取锁,如果获取不到则阻塞线程
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

//addWaiter(Node mode)
private Node addWaiter(Node mode) {
        //创建当前线程的Node
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        //如果tail节点不为null
        if (pred != null) {
            //当前线程节点前驱指向tail
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {//CAS 当前线程Node设置为tail成功
                //上个tail的后继节点指向现在的tail
                pred.next = node;
                //返回当前线程Node
                return node;
            }
        }
        enq(node);
        return node;
    }

//enq(node);
private Node enq(final Node node) {
        for (;;) {//自旋
            Node t = tail;
            if (t == null) { // 如果tail节点为null
                //初始化head,同时tail指向head
                //从后面的代码可以看出头结点是傀儡节点,
                //每次通过判读线程节点的前驱是否是头结点来决定是否获取锁
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {//将node插入到队尾
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

//acquireQueued(final Node node, int arg)
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {自旋
                //获取当前线程节点的前驱继节点
                final Node p = node.predecessor();
                //如果前驱是头节点,尝试获取锁
                if (p == head && tryAcquire(arg)) {//获取成功
                    //设置当前节点是头节点
                    setHead(node);
                    p.next = null;
                    failed = false;
                    return interrupted;
                }
                //获取失败阻塞其他线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //获取失败
            if (failed)
                //取消正在进行的尝试获取
                cancelAcquire(node);
        }
    }

// shouldParkAfterFailedAcquire(Node pred, Node node)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
       //如果前驱节点状态是Node.SIGNAL直接返回,
       //Node.SIGNAL表示前驱获取锁后会唤醒后继节点的线程
        return true;
    if (ws > 0) {
        //pred.waitStatus表示Node.CANCELLED,下面的操作跳过状态为CANCELLED的所有节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //waitStatus 等于0(初始化)或PROPAGATE,
        //说明线程还没有park,会先重试确定无法acquire到再park,
        //这里主要是共享锁时传播共享状态
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

//parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
        //阻塞当前线程
        LockSupport.park(this);
        //返回线程状态同时清除标记位,在acquire(int arg)中会再次中断状态的中断线程
        return Thread.interrupted();
    }

//cancelAcquire(Node node)
private void cancelAcquire(Node node) {
    //如果节点为null,直接返回
    if (node == null)
        return;
    node.thread = null;
    //跳过当前节点的所有前驱是CANCELLED状态的节点
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    Node predNext = pred.next;
    //标记节点状态为Node.CANCELLED
    node.waitStatus = Node.CANCELLED;
    //如果当期节点是尾节点则直接删除
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        //下面的操作是如果当前线程的节点的前驱不能满足唤醒后继的条件,则删除当前节点,
        //满足则唤醒当前节点的后继节点
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

独占式释放锁

public final boolean release(int arg) {
    if (tryRelease(arg)) {//tryRelease(arg)方法子类重新
        Node h = head;
        if (h != null && h.waitStatus != 0) //head存在
            //唤醒后继节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}

通过上面连个方法我们可以了解独占锁获取释放锁的流程,除去独占锁外还有共享锁、带有超时时间的独占式和共享锁。共享锁的获取锁后唤醒后继节点传播共享,具体大家可以查看源码。

自己简单的理解:其实具体的流程很简单,首先创建一个节点,head和tail都指向这个节点,然后各个线程去获取锁,其实就是判断它的前驱节点是不是头结点同时还有tryAcquire()是否返回true。如果获取成功设置当前节点为头节点,获取失败则将其插入到同步队列,设置状态为SIGNAL同时阻塞当前线程,SIGNAL可以保证前驱节点释放后唤醒当前节点,当前节点可以放心阻塞。然后每个出去头结点的节点开始自旋尝试获取锁,直到获取到锁返回。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,390评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,821评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,632评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,170评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,033评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,098评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,511评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,204评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,479评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,572评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,341评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,893评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,171评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,486评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,676评论 2 335

推荐阅读更多精彩内容