这里粘出的源码基于JDK1.8
部分方法的源码不是全部源码,删除掉了部分注释以及对主要逻辑没有影响的内容。
贴出源码目的主要是为了根据源码讲清逻辑。
主要内容包括:
- AQS简介
- CLH同步队列
- 同步状态的获取与释放
AQS简介
Java的内置锁(synchronized
关键字)一直都是备受争议的,在1.6之前这个重量级锁性能较为低下,虽然在1.6之后在基础层面进行了大量优化,但与Lock相比还是存在一些缺陷:
synch缺少获取锁与释放锁的可操作性,可中断,超时获取锁,且它为独占式的
为此Java提供了 Lock 组件,解决了synch关键字存在的问题,引入了更灵活的锁操作方式。
但是在Lock底层(以及绝大多数JUC同步组件的底层)是由一个非常关键由非常重要的组件支持的,这个组件就是AQS。
AQS
AQS AbstractQueuedSynchronizer
,即队列同步器,它是JUC并发包的核心组件。
它是构建JUC同步组件的基础框架,JUC并发包的作者(Dong Lea)期望它可以为实现大部分同步需求提供同步器。
AQS解决了实现同步器时涉及的大量细节实现问题,例如:获取同步状态,FIFO队列。基于AQS来构建同步器可以带来很多好处。不仅可以极大地减少实现工作,而且也不必处理在多个位置上发生的竞争问题。
AQS的主要使用方式是继承,子类通过继承AQS并实现它的抽象方法,来管理同步状态。
AQS核心变量
1、volatile int state
AQS使用一个 volatile 修饰的 int 变量来表示同步状态,当 state>0 时,表示已经获取到了锁,当 state=0 时,表示释放了锁。
它提供了三个方法:
getState()
seState(int newState)
compareAndSetState(int expect, int update)
这三个方法用于对同步状态state进行操作,当然,AQS可以确保对state操作的安全性。
2、FIFO同步队列
AQS通过内置的FIFO同步队列,来完成资源获取线程的排队工作。
如果当前线程获取同步状态失败时,AQS会将当前线程以及等待状态等信息,构造成一个节点(Node)并将其加入同步队列
同时,会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。
AQS提供的方法
getState()
:返回同步状态的当前值
setState(int newState)
:设置当前同步状态
compareAndSetState(int expect, int update)
:使用CAS的方式设置当前状态,该方法能够保证状态设置的原子性。
tryAcquire(int arg)
:独占式获取同步状态,获取同步状态成功后,其他线程需要等待该线程释放同步状态之后,才能获取同步状态。
tryRelease(int arg)
:独占式释放同步状态
tryAcquireShared(int arg)
:共享式获取同步状态,返回值大于0表示获取成功,否则获取失败。
tryReleaseShared()
:共享式释放to怒状态
isHeldExclusively()
:当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程独占
acquire(int arg)
:在方法内部调用可重写的tryAcquire(int arg)方法,独占式获取同步转台,如果当前线程获取同步状态成功,则由该方法返回,否则将会进入同步队列等待。
acquireInterruptibly(int arg)
:与 acquire(int arg)相同,但是该方法响应中断。当前线程未获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出 InterruptedException 异常并返回。
tryAcquireNanos(int arg, long nanos)
:可超时的获取同步状态,如果当前线程在 nanos 时间内没获取到同步状态,那么将会返回false,如果获取到了将返回true。
acquireShared(int arg)
:共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待。与独占式的主要区别是在同一时刻可以有多个下城获取到同步状态。
acquireSharedInturruptibly(int arg)
:共享式获取同步状态,响应中断。
tryAcquireSharedNanos(int arg, long nanosTimeout)
:共享式获取同步状态,增加超时限制。
release(int arg)
:独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒。
releaseShared(int arg)
:共享式释放同步状态
CLH同步队列
关于CLH锁的相关原理,可以看文末的参考资料,这里就不详细说了。
在上面讲到,AQS维护着一个FIFO队列,这个队列就是CLH队列。
CLH同步队列是一个FIFO双向队列,AQS依赖它,来完成同步状态的管理。
- 当线程获取同步状态失败时,AQS会将当前线程以及等待状态信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞该线程。
- 当同步状态释放时,会吧节点唤醒(公平锁),使其再次尝试获取同步状态
CLH Node
CLH等待队列,是一个以CLH锁为变量的队列。CLH锁通常用于自旋锁,这里用来阻塞同步器,同时在这个Node里保存了一个线程的相关控制信息。
在CLH同步队列中,一个节点表示一个线程,保存信息包括:
- 线程的引用(thread)
- 状态(waitStatus)
- 前驱节点(prev)
- 后继节点(next)
定义如下:
static final class Node {
/** 标记一个Node节点处于共享模式 */
static final Node SHARED = new Node();
/** 标记一个Node节点处于独占模式 */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
/** 因为超时或者中断,节点会被设置为取消状态,被取消的节点不会参与到竞争中,该节点会一直保持取消不会转变为其他状态 */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
/** 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或被取消,将通知后继节点,使后继节点得以运行 */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
/** 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了singal()后,该节点将会从等待队列转移到同步队列中,加入到同步状态的获取中 */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
* 表示下一次共享式同步状态将会无条件的传播下去
*/
static final int PROPAGATE = -3;
/**
* 当前Node的等待状态。初始值为0,表示不属于以上任何状态
*/
volatile int waitStatus;
/**
* 前驱节点
*/
volatile Node prev;
/**
* 后继节点
*/
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
* 获取同步状态的线程
*/
volatile Thread thread;
/**
* 指向下一个等待中Condition的Node节点,或者 waitStatus为SHARED的节点
*/
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 返回前驱节点
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
CLH同步队列
入列
addWaiter方法:
先快速尝试设置尾节点,如果失败,则调用enq(Node node)方法设置尾节点。
在源码中,两个方法都是通过一个 CAS 方法 cmopareAndSetTail(Node expect, Node update)
来设置尾节点,该方法可以确保节点是线程安全添加的。
在 enq(Node node)
方法中,AQS通过 自旋 的方式来保证节点可以正确添加,只有成功添加后,当前线程才会从改方法返回,否则一直自旋重试。
过程如下:
源码如下::
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
// 新建node
Node node = new Node(Thread.currentThread(), mode);
// 快速尝试添加尾节点,失败的话,调用enq来添加
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
// 死循环自旋
for (;;) {
Node t = tail;
// 如果tail 不在,则设置为首节点
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
出列
CLH同步队列遵循FIFO,首节点的线程释放同步状态后,将会唤醒它的后继节点(next),而后继节点将会在获取同步状态成功时,将自己设置为首节点。
这个过程非常简单,head执行该节点并断开原来首节点的next 和 当前节点的 prev 即可,注意在这个过程是不需要CAS来保证的,因为只有一个线程能够成功获取到同步状态。
过程图如下:
同步状态的获取与释放
自定义的子类使用AQS提供的模板方法可以实现自己的同步语义。
AQS提供了大量的模板方法来实现同步,主要分为三类:
1、独占式获取和释放同步状态
2、共享式获取和释放同步状态
3、查询同步队列中的等待线程情况
独占式
同一时刻仅有一个线程持有同步状态
独占式同步状态获取
aquire(int arg)
方法为AQS中提供的模板方法,该方法为独占式获取同步状态,会忽略中断,也就是说,由于线程获取同步状态失败加入到CLH同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移除。
各个方法定义如下:
1、tryAcquire:尝试获取锁,获取成功则设置锁状态并且返回true,否则返回false。该方法由自定义同步组件继承AQS后实现,该方法必须要保证线程安全的获取同步状态。
2、addWaiter:如果tryAcquire返回false(也就是获取同步锁失败),则调用该方法,将当前线程加入到CLH同步队列的队尾。
3、acquiredQueued:当前线程会根据公平性原则来进行阻塞等待(自旋),直到获取锁成功为止。并且返回当前线程在等待过程中有没有中断过。
4、selfInterrupt:自己产生一个中断。
acquireQueued
方法为一个自旋的过程,也就是说,当前线程节点对象(Node)进入同步队列后,就会进入下一个自旋的工程,每个节点都会自省地观察,当条件满足,获取到同步状态后,就可以从这个自旋中退出,否则一直执行下去。
从代码中可以看到,当前线程会一直尝试获取同步状态,当然前提是只有其前驱节点为头结点时,才可以尝试获取同步状态。理由:
1、保持FIFO同步队列原则。
2、头结点释放同步状态后,将会唤醒其后继节点,后继节点被唤醒后于需要检查自己是否为头节点。
acquire方法流程图:
源码如下:
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 中断标记位
boolean interrupted = false;
// 死循环自旋
for (;;) {
// 获取当前节点的前驱
final Node p = node.predecessor();
// 如果前驱是头节点,并且tryAcquire成功,则设置后返回
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果失败,则判断线程是否需要等待,并且根据结果判断是否进入等待。
// 判断有中断后,设置interruped为true,并会在方法结束返回
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
独占式获取响应中断
AQS提供的 acquire(int arg)
方法以独占方式获取同步状态,但是会忽略中断,对线程进行中断操作后,线程依然会位于CLH同步队列中等待获取同步状态。
为此,AQS提供了 acquireInterruptibly(int arg)
方法,该方法等待获取同步状态时,如果当前线程被中断了,会立刻响应中断抛出异常 IntteruptedException
方法内部会首先校验该线程是否已经被中断,如果是,则直接抛出中断异常,否则,执行 tryAcquire(int arg)
方法尝试获取同步状态。
如果成功,则直接返回,否则执行 doAcquireInterruptibly(int arg)
。
doAcquireInterruptibly(int arg)
方法 与 acquire(int arg)
仅有两处差别:
1、方法声明会抛出 InterruptException
异常。
2、在中断方法处不再使用 interrupted 标志,而是直接抛出 InterruptedException
异常。
源码如下:
/**
* Acquires in exclusive mode, aborting if interrupted.
* Implemented by first checking interrupt status, then invoking
* at least once {@link #tryAcquire}, returning on
* success. Otherwise the thread is queued, possibly repeatedly
* blocking and unblocking, invoking {@link #tryAcquire}
* until success or the thread is interrupted. This method can be
* used to implement method {@link Lock#lockInterruptibly}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
/**
* Acquires in exclusive interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
// checkInterrupted 后,直接抛出异常
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
TODO
(如果有什么错误或者建议,欢迎留言指出)
(本文内容是对各个知识点的转载整理,用于个人技术沉淀,以及大家学习交流用)
参考资料:
【死磕Java并发】—–J.U.C之AQS:AQS简介
【死磕Java并发】—–J.U.C之AQS:CLH同步队列
【死磕Java并发】—–J.U.C之AQS:同步状态的获取与释放
【死磕Java并发】—–J.U.C之AQS:阻塞和唤醒线程
源码:ThreadPoolExecutor(JDK1.8)