Java并发包基础——AQS源码解析

AQS

AbstractQueuedSynchronizer 提供了一个基于FIFO队列用于构建锁或其他相关同步装置的基础框架

继承关系

继承自AbstractOwnableSynchronizer

成员变量 序列号 serialVersionUID
线程(独占模式) exclusiveOwnerThread
函数 获取独占线程 getExclusiveOwnerThread()
设置独占线程 setExclusiveOwnerThread(Thread)

数据结构

Sync双向队列
Condition单向队列

Node

Node 每个被阻塞的线程都会被封装成一个Node结点,Node结点有前后Node的引用以及结点状态和结点对应线程 ​ 结点获取资源方式有共享式和独占式 ​

节点状态 含义
CANCELLED 表示此节点代表的线程已经被取消 1
默认状态,无含义 0
SIGNAL 表示此节点的下一个紧邻节点应该被unpark -1
CONDITION 表示此节点的线程正在等待条件 (Condition单向队列的节点只有Condition和Cancelled两种情况) -2
PROPAGATE 表示下一个aquireShared应该无条件传播 -3

成员变量

含义 变量
队列首 head
队列尾 tail
资源 state
自旋时间(默认1000) spinForTimeoutThreshold
Unsafe实例 unsafe
内存偏移地址,用于CAS stateOffset headOffset tailOffset waitStatusOffset nextOffset

成员函数

公共API

作用 接口
资源 查询资源数 getState
独占资源 获取资源(独占,无视中断) acquire(int 获取资源数)
获取资源(独占,可中断) acquireInterruptibly
获取资源(独占,可中断,限时) tryAcquireNanos
释放资源(独占) release
共享资源 获取资源(共享,无视中断) acquireShared
获取资源(共享,可中断) acquireSharedInterruptibly
获取资源(共享,可中断,限时) tryAcquireSharedNanos
释放资源(共享) releaseShared
队列线程 队列是否为空 hasQueuedThreads
是否有线程在获取资源 hasContended
获取队列中第一个非head线程 getFirstQueuedThread
线程是否在队列中 isQueued(Thread)
判断是否有其他线程排在当前线程前 hasQueuedPredecessors
获取队列长度 getQueueLength
获取队列中的线程集合 getQueuedThreads
获取队列中的独占线程集合 getExclusiveQueuedThreads
获取队列中的共享线程集合 getSharedQueuedThreads
条件 并发器是否持有指定条件 owns(ConditionObject)
并发器中指定条件的队列是否为空 hasWaiters(ConditionObject)
并发器中指定条件的队列长度 getWaitQueueLength(ConditionObject)
并发起指定条件的队列中所有线程的集合 getWaitingThreads(ConditionObject)

开放给子类的API

作用 接口
获取资源 getState()
设置资源 setState()
CAS设置资源 compareAndSetState()

子类需实现接口

作用 接口
该线程是否正在独占资源。只有用到condition才需要去实现它。 More ActionsisHeldExclusively()
尝试获取资源(独占方式),返回是否成功 tryAcquire(int)
尝试释放资源(独占方式),返回是否成功 tryRelease(int)
尝试获取资源(独占方式),返回负数 tryAcquireShared(int)
共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。 tryReleaseShared(int)

ConditionObject

为AQS增加了控制线程挂起与放下的接口,实现Condition接口

Condition接口:从Object抽取出与同步相关的方法组成的接口

条件等待

public final void await() throws InterruptedException {
 if (Thread.interrupted())
 throw new InterruptedException();
 //节点入列,当前线程封装为Condition型节点作为尾节点的nextWaiter插入队列(队列为空则作为头节点,尾节点Canceled则向上兼并所有Cancelled节点)
 Node node = addConditionWaiter();
 //释放当前资源数的资源并保存在临时变量中
 int savedState = fullyRelease(node);
 int interruptMode = 0;
 //节点不在Sync队列时——挂起
 while (!isOnSyncQueue(node)) {
 LockSupport.park(this);
 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
 break;
 }
 //入列获取资源
 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
 interruptMode = REINTERRUPT;
 if (node.nextWaiter != null) // clean up if cancelled
 unlinkCancelledWaiters();
 if (interruptMode != 0)
 reportInterruptAfterWait(interruptMode);
}

唤醒

public final void signal() {
 if (!isHeldExclusively())
 throw new IllegalMonitorStateException();
 Node first = firstWaiter;
 if (first != null)
 doSignal(first);
}
​
​
public final void signalAll() {
 if (!isHeldExclusively())
 throw new IllegalMonitorStateException();
 Node first = firstWaiter;
 if (first != null)
 doSignalAll(first);
}
​
private void doSignal(Node first) {
 do {
 if ( (firstWaiter = first.nextWaiter) == null)
 lastWaiter = null;
 first.nextWaiter = null;
 } while (!transferForSignal(first) &&
 (first = firstWaiter) != null);
}
​
​
private void doSignalAll(Node first) {
 lastWaiter = firstWaiter = null;
 do {
 Node next = first.nextWaiter;
 first.nextWaiter = null;
 transferForSignal(first);
 first = next;
 } while (first != null);
}
final boolean transferForSignal(Node node) {
 //将节点状态更换为默认状态
 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
 return false;
​
 //入列
 Node p = enq(node);
 int ws = p.waitStatus;
 //将节点前置节点设置为Signal节点,放下节点对应线程
 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
 LockSupport.unpark(node.thread);
 return true;
}

源码分析

acquire

public final void acquire(int arg) {
 if (!tryAcquire(arg) &&
 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
 selfInterrupt();
}

先插队尝试获取,失败则以独占模式入队并排队获取,排队获取失败的话中断线程

acquire流程图

以独占模式入队——addWaiter(Node.EXCLUSIVE)

private Node addWaiter(Node mode) {
 Node node = new Node(Thread.currentThread(), mode);
 // Try the fast path of enq; backup to full enq on failure
 Node pred = tail;
 if (pred != null) {
 node.prev = pred;
 if (compareAndSetTail(pred, node)) {
 pred.next = node;
 return node;
 }
 }
 enq(node);
 return node;
}
addWaiter效果

排队获取资源

final boolean acquireQueued(final Node node, int arg) {
 boolean failed = true;
 try {
 boolean interrupted = false;
 for (;;) {
 final Node p = node.predecessor();
 //p是等候队列第一位——尝试获取资源
 if (p == head && tryAcquire(arg)) {
 setHead(node);
 p.next = null;
 failed = false;
 return interrupted;
 }
 //未获取到资源,酌情考虑挂起线程或再次尝试获取
 if (shouldParkAfterFailedAcquire(p, node) &&
 parkAndCheckInterrupt())
 interrupted = true;
 }
 } finally {
 //出现异常亦未获取到资源,取消获取
 if (failed)
 cancelAcquire(node);
 }
}
acquireQueued流程图

尝试获取资源失败时判断是否应该挂起——shouldParkAfterFailedAcquire

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 int ws = pred.waitStatus;
 //前一个节点状态为SINGAL——它释放资源会唤醒下一位,可以安心挂起
 if (ws == Node.SIGNAL)
 return true;
 //前一个节点状态CANCELLED——已被取消,向前兼并所有CANCELLED节点,不可挂起
 if (ws > 0) {
 do {
 node.prev = pred = pred.prev;
 } while (pred.waitStatus > 0);
 pred.next = node;
 //前一个节点不是CANCELLED,设置为SINGAL,暂时不可挂起
 } else {
 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
 }
 return false;
}

取消获取资源——cancelAcquire

private void cancelAcquire(Node node) {
 if (node == null)
 return;
​
 //释放线程
 node.thread = null;
​
 // 兼并节点前所有CANCELLED节点
 Node pred = node.prev;
 while (pred.waitStatus > 0)
 node.prev = pred = pred.prev;
​
 Node predNext = pred.next;
​
 //直接修改状态为CANNELED——无需CAS是因为取消获取资源优先级较高
 node.waitStatus = Node.CANCELLED;
​
 // 如果此节点是尾部节点直接移除
 if (node == tail && compareAndSetTail(node, pred)) {
 compareAndSetNext(pred, predNext, null);

 } else {
 int ws;
 //要取消获取的节点不是等待中的第一个节点/前一个节点是SINGAL节点/前一个节点CAS为SAINGAL节点成功
 //直接删除此节点,在前一个节点后拼解后续节点
 if (pred != head &&
 ((ws = pred.waitStatus) == Node.SIGNAL ||
 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
 pred.thread != null) {
 Node next = node.next;
 if (next != null && next.waitStatus <= 0)
 compareAndSetNext(pred, predNext, next);
 } else {
 //取消获取的节点是等待中的第一个节点/前一个节点不是SINGAL节点
 //放下后续线程
 unparkSuccessor(node);
 }
​
 node.next = node; // help GC
 }
}

Release

public final boolean release(int arg) {
 //尝试释放资源,成功则放下后续节点并置换head
 if (tryRelease(arg)) {
 Node h = head;
 if (h != null && h.waitStatus != 0)
 unparkSuccessor(h);
 return true;
 }
 return false;
}

独占式获取释放资源只在释放时放下线程,因而通常情况下只有head持有资源而后续节点的线程统统被挂起

AcquireShared

public final void acquireShared(int arg) {
 if (tryAcquireShared(arg) < 0)
 doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
 //将当前线程封装成Shared型节点入队
 final Node node = addWaiter(Node.SHARED);
 boolean failed = true;
 try {
 boolean interrupted = false;
 for (;;) {
 final Node p = node.predecessor();
 //排在队首(前一位就是head)——尝试获取
 if (p == head) {
 int r = tryAcquireShared(arg);
 if (r >= 0) {
 //第一位获取成功——设置为head并doReleaseShared
 setHeadAndPropagate(node, r);
 p.next = null; // help GC
 if (interrupted)
 selfInterrupt();
 failed = false;
 return;
 }
 }
 //未成功获取——决定是否挂起
 if (shouldParkAfterFailedAcquire(p, node) &&
 parkAndCheckInterrupt())
 interrupted = true;
 }
 } finally {
 if (failed)
 cancelAcquire(node);
 }
}

共享式获取资源在成功获取资源时会调用doReleaseShared放下后续被挂起的线程,只要资源充分,可同时有多个线程持有

releaseShared

public final boolean releaseShared(int arg) {
 if (tryReleaseShared(arg)) {
 doReleaseShared();
 return true;
 }
 return false;
}
private void doReleaseShared() {    
 for (;;) {
 Node h = head;
 if (h != null && h != tail) {
 int ws = h.waitStatus;
 //head是Signal节点——设置为默认节点并释放后续节点
 if (ws == Node.SIGNAL) {
 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
 continue;
 unparkSuccessor(h);
 }
 //head是默认节点——设置为Propagate节点
 else if (ws == 0 &&
 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
 continue;                // loop on failed CAS
 }
 if (h == head)                   // loop if head changed
 break;
 }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 221,198评论 6 514
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,334评论 3 398
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 167,643评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,495评论 1 296
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,502评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,156评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,743评论 3 421
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,659评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,200评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,282评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,424评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,107评论 5 349
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,789评论 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,264评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,390评论 1 271
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,798评论 3 376
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,435评论 2 359

推荐阅读更多精彩内容