前提#
并发编程大师Doug Lea在编写JUC(java.util.concurrent)包的时候引入了java.util.concurrent.locks.AbstractQueuedSynchronizer,其实是Abstract Queued Synchronizer,也就是"基于队列实现的抽象同步器",一般我们称之为AQS。其实Doug Lea大神编写AQS是有严谨的理论基础的,他的个人博客上有一篇论文《The java.util.concurrent Synchronizer Framewor》,可以在互联网找到相应的译文《JUC同步器框架》,如果想要深入研究AQS必须要理解一下该论文的内容,然后结合论文内容详细分析一下AQS的源码实现。本文在阅读AQS源码的时候选用的JDK版本是JDK11。
出于写作习惯,下文会把AbstractQueuedSynchronizer称为AQS、JUC同步器框或者同步器框架。
AQS的主要功能#
AQS是JUC包中用于构建锁或者其他同步组件(信号量、事件等)的基础框架类。AQS从它的实现上看主要提供了下面的功能:
- 同步状态的原子性管理。
- 线程的阻塞和解除阻塞。
- 提供阻塞线程的存储队列。
基于这三大功能,衍生出下面的附加功能:
- 通过中断实现的任务取消,此功能基于线程中断实现。
- 可选的超时设置,也就是调用者可以选择放弃等待任务执行完毕直接返回。
- 定义了Condition接口,用于支持管程形式的await/signal/signalAll操作,代替了Object类基于JNI提供的wait/notify/notifyAll。
AQS还根据同步状态的不同管理方式区分为两种不同的实现:独占状态的同步器和共享状态的同步器。
同步器框架基本原理#
《The java.util.concurrent Synchronizer Framework》一文中其实有提及到同步器框架的伪代码:
Copy// acquire操作如下: while (synchronization state does not allow acquire) { enqueue current thread if not already queued; possibly block current thread; } dequeue current thread if it was queued; //release操作如下: update synchronization state; if (state may permit a blocked thread to acquire){ unblock one or more queued threads; }
撇脚翻译一下:
Copy// acquire操作如下: while(同步状态申请获取失败){ if(当前线程未进入等待队列){ 当前线程放入等待队列; } 尝试阻塞当前线程; } 当前线程移出等待队列 //release操作如下: 更新同步状态 if(同步状态足够允许一个阻塞的线程申请获取){ 解除一个或者多个等待队列中的线程的阻塞状态; }
为了实现上述操作,需要下面三个基本环节的相互协作:
- 同步状态的原子性管理。
- 等待队列的管理。
- 线程的阻塞与解除阻塞。
其实基本原理很简单,但是为了应对复杂的并发场景和并发场景下程序执行的正确性,同步器框架在上面的acquire操作和release操作中使用了大量的死循环和CAS等操作,再加上Doug Lea喜欢使用单行复杂的条件判断代码,如一个if条件语句会包含大量操作,AQS很多时候会让人感觉实现逻辑过于复杂。
同步状态管理#
AQS内部内部定义了一个32位整型的state变量用于保存同步状态:
Copy/** * The synchronization state.(同步状态值) */ private volatile int state; // 获取state protected final int getState() { return state; } // 直接覆盖设置state protected final void setState(int newState) { state = newState; } // CAS设置state protected final boolean compareAndSetState(int expect, int update) { return STATE.compareAndSet(this, expect, update); }
同步状态state在不同的实现中可以有不同的作用或者表示意义,这里其实不能单纯把它理解为中文意义上的"状态",它可以代表资源数、锁状态等等,下文遇到具体的场景我们再分析它表示的意义。
CLH队列与变体#
CLH锁即Craig, Landin, and Hagersten (CLH) locks,因为它底层是基于队列实现,一般也称为CLH队列锁。CLH锁也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程仅仅在本地变量上自旋,它不断轮询前驱的状态,假设发现前驱释放了锁就结束自旋。从实现上看,CLH锁是一种自旋锁,能确保无饥饿性,提供先来先服务的公平性。先看简单的CLH锁的一个简单实现:
Copypublic class CLHLock implements Lock { AtomicReference<QueueNode> tail = new AtomicReference<>(new QueueNode()); ThreadLocal<QueueNode> pred; ThreadLocal<QueueNode> current; public CLHLock() { current = ThreadLocal.withInitial(QueueNode::new); pred = ThreadLocal.withInitial(() -> null); } @Override public void lock() { QueueNode node = current.get(); node.locked = true; QueueNode pred = tail.getAndSet(node); this.pred.set(pred); while (pred.locked) { } } @Override public void unlock() { QueueNode node = current.get(); node.locked = false; current.set(this.pred.get()); } static class QueueNode { boolean locked; } // 忽略其他接口方法的实现 }
上面是一个简单的CLH队列锁的实现,内部类QueueNode只使用了一个简单的布尔值locked属性记录了每个线程的状态,如果该属性为true,则相应的线程要么已经获取到锁,要么正在等待锁,如果该属性为false,则相应的线程已经释放了锁。新来的想要获取锁的线程必须对tail属性调用getAndSet()方法,使得自身成为队列的尾部,同时得到一个指向前驱节点的引用pred,最后线程所在节点在其前驱节点的locked属性上自旋,直到前驱节点释放锁。上面的实现是无法运行的,因为一旦自旋就会进入死循环导致CPU飙升,可以尝试使用下文将要提到的LockSupport进行改造。
CLH队列锁本质是使用队列(实际上是单向链表)存放等待获取锁的线程,等待的线程总是在其所在节点的前驱节点的状态上自旋,直到前驱节点释放资源。从实际来看,过度自旋带来的CPU性能损耗比较大,并不是理想的线程等待队列的实现。
基于原始的CLH队列锁中提供的等待队列的基本原理,AQS实现一种了CLH锁队列的变体(Variant)。AQS类的protected修饰的构造函数里面有一大段注释用于说明AQS实现的等待队列的细节事项,这里列举几点重要的:
- AQS实现的等待队列没有直接使用CLH锁队列,但是参考了其设计思路,等待节点会保存前驱节点中线程的信息,内部也会维护一个控制线程阻塞的状态值。
- 每个节点都设计为一个持有单独的等待线程并且"带有具体的通知方式"的监视器,这里所谓通知方式就是自定义唤醒阻塞线程的方式而已。
- 一个线程是等待队列中的第一个等待节点的持有线程会尝试获取锁,但是并不意味着它一定能够获取锁成功(这里的意思是存在公平和非公平的实现),获取失败就要重新等待。
- 等待队列中的节点通过prev属性连接前驱节点,通过next属性连接后继节点,简单来说,就是双向链表的设计。
- CLH队列本应该需要一个虚拟的头节点,但是在AQS中没有直接提供虚拟的头节点,而是延迟到第一次竞争出现的时候懒创建虚拟的头节点(其实也会创建尾节点,初始化时头尾节点是同一个节点)。
- Condition(条件)等待队列中的阻塞线程使用的是相同的Node结构,但是提供了另一个链表用来存放,Condition等待队列的实现比非Condition等待队列复杂。
线程阻塞与唤醒#
线程的阻塞和唤醒在JDK1.5之前,一般只能依赖于Object类提供的wait()、notify()和notifyAll()方法,它们都是JNI方法,由JVM提供实现,并且它们必须运行在获取监视器锁的代码块内(synchronized代码块中),这个局限性先不谈性能上的问题,代码的简洁性和灵活性是比较低的。JDK1.5引入了LockSupport类,底层是基于Unsafe类的park()和unpark()方法,提供了线程阻塞和唤醒的功能,它的机制有点像只有一个允许使用资源的信号量java.util.concurrent.Semaphore,也就是一个线程只能通过park()方法阻塞一次,只能调用unpark()方法解除调用阻塞一次,线程就会唤醒(多次调用unpark()方法也只会唤醒一次),可以想象是内部维护了一个0-1的计数器。
LockSupport类如果使用得好,可以提供更灵活的编码方式,这里举个简单的使用例子:
Copypublic class LockSupportMain implements Runnable { private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); private Thread thread; private void setThread(Thread thread) { this.thread = thread; } public static void main(String[] args) throws Exception { LockSupportMain main = new LockSupportMain(); Thread thread = new Thread(main, "LockSupportMain"); main.setThread(thread); thread.start(); Thread.sleep(2000); main.unpark(); Thread.sleep(2000); } @Override public void run() { System.out.println(String.format("%s-步入run方法,线程名称:%s", FORMATTER.format(LocalDateTime.now()), Thread.currentThread().getName())); LockSupport.park(); System.out.println(String.format("%s-解除阻塞,线程继续执行,线程名称:%s", FORMATTER.format(LocalDateTime.now()), Thread.currentThread().getName())); } private void unpark() { LockSupport.unpark(thread); } } // 某个时刻的执行结果如下: 2019-02-25 00:39:57.780-步入run方法,线程名称:LockSupportMain 2019-02-25 00:39:59.767-解除阻塞,线程继续执行,线程名称:LockSupportMain
LockSupport类park()方法也有带超时的变体版本方法,遇到带超时期限阻塞等待场景下不妨可以使用LockSupport#parkNanos()。
独占线程的保存#
AbstractOwnableSynchronizer是AQS的父类,一个同步器框架有可能在一个时刻被某一个线程独占,AbstractOwnableSynchronizer就是为所有的同步器实现和锁相关实现提供了基础的保存、获取和设置独占线程的功能,这个类的源码很简单:
Copypublic abstract class AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 3737899427754241961L; protected AbstractOwnableSynchronizer() { } // 当前独占线程的瞬时实例 - 提供Getter和Setter方法 private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }
它就提供了一个保存独占线程的变量对应的Setter和Getter方法,方法都是final修饰的,子类只能使用不能覆盖。
CLH队列变体的实现#
这里先重点分析一下AQS中等待队列的节点AQS的静态内部类Node的源码:
Copystatic final class Node { // 标记一个节点处于共享模式下的等待 static final Node SHARED = new Node(); // 标记一个节点处于独占模式下的等待 static final Node EXCLUSIVE = null; // 取消状态 static final int CANCELLED = 1; // 唤醒状态 static final int SIGNAL = -1; // 条件等待状态 static final int CONDITION = -2; // 传播状态 static final int PROPAGATE = -3; // 等待状态,初始值为0,其他可选值是上面的4个值 volatile int waitStatus; // 当前节点前驱节点的引用 volatile Node prev; // 当前节点后继节点的引用 volatile Node next; // 当前节点持有的线程,可能是阻塞中等待唤醒的线程 volatile Thread thread; // 下一个等待节点 Node nextWaiter; // 当前操作的节点是否处于共享模式 final boolean isShared() { return nextWaiter == SHARED; } // 获取当前节点的前驱节点,确保前驱节点必须存在,否则抛出NPE final Node predecessor() { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } // 空节点,主要是首次创建队列的时候创建的头和尾节点使用 Node() {} // 设置下一个等待节点,设置持有线程为当前线程 Node(Node nextWaiter) { this.nextWaiter = nextWaiter; THREAD.set(this, Thread.currentThread()); } // 设置waitStatus,设置持有线程为当前线程 Node(int waitStatus) { WAITSTATUS.set(this, waitStatus); THREAD.set(this, Thread.currentThread()); } // CAS更新waitStatus final boolean compareAndSetWaitStatus(int expect, int update) { return WAITSTATUS.compareAndSet(this, expect, update); } // CAS设置后继节点 final boolean compareAndSetNext(Node expect, Node update) { return NEXT.compareAndSet(this, expect, update); } // 设置前驱节点 final void setPrevRelaxed(Node p) { PREV.set(this, p); } // 下面是变量句柄的实现,在VarHandle出现之前使用的是Unsafe,其实底层还是照样使用Unsafe private static final VarHandle NEXT; private static final VarHandle PREV; private static final VarHandle THREAD; private static final VarHandle WAITSTATUS; static { try { MethodHandles.Lookup l = MethodHandles.lookup(); NEXT = l.findVarHandle(Node.class, "next", Node.class); PREV = l.findVarHandle(Node.class, "prev", Node.class); THREAD = l.findVarHandle(Node.class, "thread", Thread.class); WAITSTATUS = l.findVarHandle(Node.class, "waitStatus", int.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } } }
其中,变量句柄(VarHandle)是JDK9引入的新特性,其实底层依赖的还是Unsafe的方法,笔者认为可以简单理解它为Unsafe的门面类,而定义的方法基本都是面向变量属性的操作。这里需要关注一下Node里面的几个属性:
- waitStatus:当前Node实例的等待状态,可选值有5个。
- 初始值整数0:当前节点如果不指定初始化状态值,默认值就是0,侧面说明节点正在等待队列中处于等待状态。
- Node#CANCELLED整数值1:表示当前节点实例因为超时或者线程中断而被取消,等待中的节点永远不会处于此状态,被取消的节点中的线程实例不会阻塞。
- Node#SIGNAL整数值-1:表示当前节点的后继节点是(或即将是)阻塞的(通过LockSupport#park()),当它释放或取消时,当前节点必须LockSupport#unpark()它的后继节点。
- Node#CONDITION整数值-2:表示当前节点是条件队列中的一个节点,当它转换为同步队列中的节点的时候,状态会被重新设置为0。
- Node#PROPAGATE整数值-3:此状态值通常只设置到调用了doReleaseShared()方法的头节点,确保releaseShared()方法的调用可以传播到其他的所有节点,简单理解就是共享模式下节点释放的传递标记。
- prev、next:当前Node实例的前驱节点引用和后继节点引用。
- thread:当前Node实例持有的线程实例引用。
- nextWaiter:这个值是一个比较容易令人生疑的值,虽然表面上它称为"下一个等待的节点",但是实际上它有三种取值的情况。
- 值为静态实例Node.EXCLUSIVE(也就是null),代表当前的Node实例是独占模式。
- 值为静态实例Node.SHARED,代表当前的Node实例是共享模式。
- 值为非Node.EXCLUSIVE和Node.SHARED的其他节点实例,代表Condition等待队列中当前节点的下一个等待节点。
Node类的等待状态waitStatus理解起来是十分费劲的,下面分析AQS其他源码段的时候会标识此状态变化的时机。
其实上面的Node类可以直接拷贝出来当成一个新建的类,然后尝试构建一个双向链表自行调试,这样子就能深刻它的数据结构。例如:
Copypublic class AqsNode { static final AqsNode SHARED = new AqsNode(); static final AqsNode EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; volatile AqsNode prev; volatile AqsNode next; volatile Thread thread; AqsNode nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } final AqsNode predecessor() { AqsNode p = prev; if (p == null) throw new NullPointerException(); else return p; } AqsNode() { } AqsNode(AqsNode nextWaiter) { this.nextWaiter = nextWaiter; THREAD.set(this, Thread.currentThread()); } AqsNode(int waitStatus) { WAITSTATUS.set(this, waitStatus); THREAD.set(this, Thread.currentThread()); } final boolean compareAndSetWaitStatus(int expect, int update) { return WAITSTATUS.compareAndSet(this, expect, update); } final boolean compareAndSetNext(AqsNode expect, AqsNode update) { return NEXT.compareAndSet(this, expect, update); } final void setPrevRelaxed(AqsNode p) { PREV.set(this, p); } private static final VarHandle NEXT; private static final VarHandle PREV; private static final VarHandle THREAD; private static final VarHandle WAITSTATUS; static { try { MethodHandles.Lookup l = MethodHandles.lookup(); NEXT = l.findVarHandle(AqsNode.class, "next", AqsNode.class); PREV = l.findVarHandle(AqsNode.class, "prev", AqsNode.class); THREAD = l.findVarHandle(AqsNode.class, "thread", Thread.class); WAITSTATUS = l.findVarHandle(AqsNode.class, "waitStatus", int.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } } public static void main(String[] args) throws Exception { AqsNode head = new AqsNode(); AqsNode next = new AqsNode(AqsNode.EXCLUSIVE); head.next = next; next.prev = head; AqsNode tail = new AqsNode(AqsNode.EXCLUSIVE); next.next = tail; tail.prev = next; List<Thread> threads = new ArrayList<>(); for (AqsNode node = head; node != null; node = node.next) { threads.add(node.thread); } System.out.println(threads); } } // 某次执行的输出: [null, Thread[main,5,main], Thread[main,5,main]]
实际上,AQS中一共存在两种等待队列,其中一种是普通的同步等待队列,这里命名为Sync Queue,另一种是基于Sync Queue实现的条件等待队列,这里命名为Condition Queue。
理解同步等待队列#
前面已经介绍完AQS的同步等待队列节点类,下面重点分析一下同步等待队列的相关源码,下文的Sync队列、Sync Queue、同步队列和同步等待队列是同一个东西。首先,我们通过分析Node节点得知Sync队列一定是双向链表,AQS中有两个瞬时成员变量用来存放头节点和尾节点:
Copy// 头节点引用(注意由transient volatile修饰,不会序列化,并且写操作会马上刷新到主内存) private transient volatile Node head; // 尾节点引用(注意由transient volatile修饰,不会序列化,并且写操作会马上刷新到主内存) private transient volatile Node tail; // 变量句柄相关,用于CAS操作头尾节点 private static final VarHandle STATE; private static final VarHandle HEAD; private static final VarHandle TAIL; static { try { MethodHandles.Lookup l = MethodHandles.lookup(); STATE = l.findVarHandle(AbstractQueuedSynchronizer.class, "state", int.class); HEAD = l.findVarHandle(AbstractQueuedSynchronizer.class, "head", Node.class); TAIL = l.findVarHandle(AbstractQueuedSynchronizer.class, "tail", Node.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } // 确保LockSupport类已经初始化 - 这里应该是为了修复之前一个因为LockSupport未初始化导致的BUG Class<?> ensureLoaded = LockSupport.class; } // 初始化同步队列,注意初始化同步队列的时候,头尾节点都是指向同一个新的Node实例 private final void initializeSyncQueue() { Node h; if (HEAD.compareAndSet(this, null, (h = new Node()))) tail = h; } // CAS设置同步队列的尾节点 private final boolean compareAndSetTail(Node expect, Node update) { return TAIL.compareAndSet(this, expect, update); } // 设置头节点,重点注意这里:传入的节点设置成头节点之后,前驱节点和持有的线程会置为null,这是因为: // 1.头节点一定没有前驱节点。 // 2.当节点被设置为头节点,它所在的线程一定是已经解除了阻塞。 private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
当前线程加入同步等待队列和同步等待队列的初始化是同一个方法,前文提到过:同步等待队列的初始化会延迟到第一次可能出现竞争的情况,这是为了避免无谓的资源浪费,具体方法是addWaiter(Node mode):
Copy// 添加等待节点到同步等待队列,实际上初始化队列也是这个方法完成的 private Node addWaiter(Node mode) { // 基于当前线程创建一个新节点,节点的模式由调用者决定 Node node = new Node(mode); for (;;) { Node oldTail = tail; // 尾节点不为空说明队列已经初始化过,则把新节点加入到链表中,作为新的尾节点,建立和前驱节点的关联关系 if (oldTail != null) { node.setPrevRelaxed(oldTail); if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return node; } } else { // 尾节点为空说明队列尚未初始化过,进行一次初始化操作 initializeSyncQueue(); } } }
在首次调用addWaiter()方法,死循环至少执行两轮再跳出,因为同步队列必须初始化完成后(第一轮循环),然后再把当前线程所在的新节点实例添加到等待队列中再返(第二轮循环)当前的节点,这里需要注意的是新加入同步等待队列的节点一定是添加到队列的尾部并且会更新AQS中的tail属性为最新入队的节点实例。
假设我们使用Node.EXCLUSIVE模式把新增的等待线程加入队列,例如有三个线程分别是thread-1、thread-2和thread-3,线程入队的时候都处于阻塞状态,模拟一下依次调用上面的入队方法的同步队列的整个链表的状态。
先是线程thread-1加入等待队列:
接着是线程thread-2加入等待队列:
最后是线程thread-3加入等待队列:
如果仔细研究会发现,如果所有的入队线程都处于阻塞状态的话,新入队的线程总是添加到队列的tail节点,阻塞的线程总是"争抢"着成为head节点,这一点和CLH队列锁的阻塞线程总是基于前驱节点自旋以获取锁的思路是一致的。下面将会分析的独占模式与共享模式,线程加入等待队列都是通过addWaiter()方法。
理解条件等待队列#
前面已经相对详细地介绍过同步等待队列,在AQS中还存在另外一种相对特殊和复杂的等待队列-条件等待队列。介绍条件等待队列之前,要先介绍java.util.concurrent.locks.Condition接口。
Copypublic interface Condition { // 当前线程进入等待状态直到被唤醒或者中断 void await() throws InterruptedException; // 当前线程进入等待状态,不响应中断,阻塞直到被唤醒 void awaitUninterruptibly(); // 当前线程进入等待状态直到被唤醒或者中断,阻塞带时间限制 long awaitNanos(long nanosTimeout) throws InterruptedException; // 当前线程进入等待状态直到被唤醒或者中断,阻塞带时间限制 boolean await(long time, TimeUnit unit) throws InterruptedException; // 当前线程进入等待状态直到被唤醒或者中断,阻塞带时间限制 boolean awaitUntil(Date deadline) throws InterruptedException; // 唤醒单个阻塞线程 void signal(); // 唤醒所有阻塞线程 void signalAll(); }
Condition可以理解为Object中的wait()、notify()和notifyAll()的替代品,因为Object中的相应方法是JNI(Native)方法,由JVM实现,对使用者而言并不是十分友好(有可能伴随JVM版本变更而受到影响),而Condition是基于数据结构和相应算法实现对应的功能,我们可以从源码上分析其实现。
Condition的实现类是AQS的公有内部类ConditionObject。ConditionObject提供的入队列方法如下:
Copypublic class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ - 条件队列的第一个节点 private transient Node firstWaiter; /** Last node of condition queue. */ - 条件队列的最后一个节点 private transient Node lastWaiter; // 公有构造函数 public ConditionObject() { } // 添加条件等待节点 private Node addConditionWaiter() { // 这里做一次判断,当前线程必须步入此同步器实例 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 临时节点t赋值为lastWaiter引用 Node t = lastWaiter; // If lastWaiter is cancelled, clean out. // 最后一个节点不为条件等待状态,则是取消状态 if (t != null && t.waitStatus != Node.CONDITION) { // 解除所有取消等待的节点的连接 unlinkCancelledWaiters(); t = lastWaiter; } // 基于当前线程新建立一个条件等待类型的节点 Node node = new Node(Node.CONDITION); // 首次创建Condition的时候,最后一个节点临时引用t为null,则把第一个节点置为新建的节点 if (t == null) firstWaiter = node; else // 已经存在第一个节点,则通过nextWaiter连接新的节点 t.nextWaiter = node; // 最后一个节点的引用更新为新节点的引用 lastWaiter = node; return node; } // 从条件等待队列解除所有取消等待的节点的连接,其实就是所有取消节点移除的操作,涉及到双向链表的断链操作、第一个和最后一个节点的引用更新 private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; // 注意这里等待状态的判断 if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } } // 当前同步器实例持有的线程是否当前线程(currentThread()) protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); } // 暂时不分析其他方法 }
实际上,Condition的所有await()方法变体都调用addConditionWaiter()添加阻塞线程到条件队列中。我们按照分析同步等待队列的情况,分析一下条件等待队列。正常情况下,假设有2个线程thread-1和thread-2进入条件等待队列,都处于阻塞状态。
先是thread-1进入条件队列:
然后是thread-2进入条件队列:
条件等待队列看起来也并不复杂,但是它并不是单独存在和使用的,一般依赖于同步等待队列,下面的一节分析Condition的实现的时候再详细分析。
独占模式与共享模式#
前文提及到,同步器涉及到独占模型和共享模式。下面就针对这两种模式详细分析一下AQS的具体实现源码。
独占模式#
AQS同步器如果使用独占(EXCLUSIVE)模式,那么意味着同一个时刻,只有唯一的一个节点所在线程获取(acuqire)原子状态status成功,此时该线程可以从阻塞状态解除继续运行,而同步等待队列中的其他节点持有的线程依然处于阻塞状态。独占模式同步器的功能主要由下面的四个方法提供:
- acquire(int arg):申请获取arg个原子状态status(申请成功可以简单理解为status = status - arg)。
- acquireInterruptibly(int arg):申请获取arg个原子状态status,响应线程中断。
- tryAcquireNanos(int arg, long nanosTimeout):申请获取arg个原子状态status,带超时的版本。
- release(int arg):释放arg个原子状态status(释放成功可以简单理解为status = status + arg)。
独占模式下,AQS同步器实例初始化时候传入的status值,可以简单理解为"允许申请的资源数量的上限值",下面的acquire类型的方法暂时称为"获取资源",而release方法暂时称为"释放资源"。接着我们分析前面提到的四个方法的源码,先看acquire(int arg):
Copypublic final void acquire(int arg) { // 获取资源成功或者新增一个独占类型节点到同步等待队列成功则直接返回,否则中断当前线程 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } // 此方法必须又子类覆盖,用于决定是否获取资源成功 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } // 中断当前线程 static void selfInterrupt() { Thread.currentThread().interrupt(); } // 不可中断的独占模式下,同步等待队列中的线程获取资源的方法 final boolean acquireQueued(final Node node, int arg) { boolean interrupted = false; try { for (;;) { // 获取新入队节点的前驱节点 final Node p = node.predecessor(); // 前驱节点为头节点并且尝试获取资源成功,也就是每一轮循环都会调用tryAcquire尝试获取资源,除非阻塞或者跳出循环 if (p == head && tryAcquire(arg)) { // 设置新入队节点为头节点,原来的节点会从队列中断开 setHead(node); p.next = null; // help GC return interrupted; // <== 注意,这个位置是跳出死循环的唯一位置 } // 判断是否需要阻塞当前获取资源失败的节点中持有的线程 if (shouldParkAfterFailedAcquire(p, node)) // 阻塞当前线程,如果被唤醒则返回并清空线程的中断标记 interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node); if (interrupted) selfInterrupt(); throw t; } } /** * 检查并且更新获取资源失败的节点的状态,返回值决定线程是否需要被阻塞。 * 这个方法是所有循环获取资源方法中信号控制的主要方法 */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 这里记住ws是当前处理节点的前驱节点的等待状态 int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 前驱节点状态设置成Node.SIGNAL成功,等待被release调用释放,后继节点可以安全地进入阻塞状态 return true; if (ws > 0) { // ws大于0只有一种情况Node.CANCELLED,说明前驱节点已经取消获取资源, // 这个时候会把所有这类型取消的前驱节点移除,找到一个非取消的节点重新通过next引用连接当前节点 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 其他等待状态直接修改前驱节点等待状态为Node.SIGNAL pred.compareAndSetWaitStatus(ws, Node.SIGNAL); } return false; } // 阻塞当前线程,获取并且重置线程的中断标记位 private final boolean parkAndCheckInterrupt() { // 这个就是阻塞线程的实现,依赖Unsafe的API LockSupport.park(this); return Thread.interrupted(); }
上面的代码虽然看起来能基本理解,但是最好用图推敲一下"空间上的变化":
接着分析一下release(int arg)的实现:
Copy// 释放资源
public final boolean release(int arg) {
// 尝试释放资源
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 尝试释放资源,独占模式下,尝试通过重新设置status的值从而实现释放资源的功能
// 这个方法必须由子类实现
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// 解除传入节点(一般是头节点)的第一个后继节点的阻塞状态,当前处理节点的等待状态会被CAS更新为0
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 当前处理的节点(一般是头节点)状态小于0则直接CAS更新为0
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 如果节点的第一个后继节点为null或者等待状态大于0(取消),则从等待队列的尾节点向前遍历,
// 找到最后一个不为null,并且等待状态小于等于0的节点
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
// 解除上面的搜索到的节点的阻塞状态
if (s != null)
LockSupport.unpark(s.thread);
}
接着用上面的图:
上面图中thread-2晋升为头节点的第一个后继节点,等待下一个release()释放资源唤醒之就能晋升为头节点,一旦晋升为头节点也就是意味着可以解除阻塞继续运行。接着我们可以看acquire()的响应中断版本和带超时的版本。先看acquireInterruptibly(int arg):
Copypublic final void acquireInterruptibly(int arg) throws InterruptedException { // 获取并且清空线程中断标记位,如果是中断状态则直接抛InterruptedException异常 if (Thread.interrupted()) throw new InterruptedException(); // 如果获取资源失败 if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } // 独占模式下响应中断的获取资源方法 private void doAcquireInterruptibly(int arg) throws InterruptedException { // 基于当前线程新增一个独占的Node节点进入同步等待队列中 final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return; } // 获取资源失败进入阻塞状态 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 解除阻塞后直接抛出InterruptedException异常 throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; } }
doAcquireInterruptibly(int arg)方法和acquire(int arg)类似,最大的不同点在于阻塞线程解除阻塞后并不是正常继续运行,而是直接抛出InterruptedException异常。最后看tryAcquireNanos(int arg, long nanosTimeout)的实现:
Copy// 独占模式下尝试在指定超时时间内获取资源,响应线程中断 public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } // 独占模式下带超时时间限制的获取资源方法 private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { // 超时期限小于0纳秒,快速失败 if (nanosTimeout <= 0L) return false; // 超时的最终期限是当前系统时钟纳秒+外部指定的nanosTimeout增量 final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return true; } // 计算出剩余的超时时间 nanosTimeout = deadline - System.nanoTime(); // 剩余超时时间小于0说明已经超时则取消获取 if (nanosTimeout <= 0L) { cancelAcquire(node); return false; } // 这里会判断剩余超时时间大于1000纳秒的时候才会进行带超时期限的线程阻塞,否则会进入下一轮获取尝试 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; } }
tryAcquireNanos(int arg, long nanosTimeout)其实和doAcquireInterruptibly(int arg)类似,它们都响应线程中断,不过tryAcquireNanos()在获取资源的每一轮循环尝试都会计算剩余可用的超时时间,只有同时满足获取失败需要阻塞并且剩余超时时间大于SPIN_FOR_TIMEOUT_THRESHOLD(1000纳秒)的情况下才会进行阻塞。
独占模式的同步器的一个显著特点就是:头节点的第一个有效(非取消)的后继节点,总是尝试获取资源,一旦获取资源成功就会解除阻塞并且晋升为头节点,原来所在节点会移除出同步等待队列,原来的队列长度就会减少1,然后头结点的第一个有效的后继节点继续开始竞争资源。
使用独占模式同步器的主要类库有:
- 可重入锁ReentrantLock。
- 读写锁ReentrantReadWriteLock中的写锁WriteLock。
共享模式#
共享(SHARED)模式中的"共享"的含义是:同一个时刻,如果有一个节点所在线程获取(acuqire)原子状态status成功,那么它会解除阻塞被唤醒,并且会把唤醒状态传播到所有有效的后继节点(换言之就是唤醒整个同步等待队列中的所有有效的节点)。共享模式同步器的功能主要由下面的四个方法提供:
- acquireShared(int arg):申请获取arg个原子状态status(申请成功可以简单理解为status = status - arg)。
- acquireSharedInterruptibly(int arg):申请获取arg个原子状态status,响应线程中断。
- tryAcquireSharedNanos(int arg, long nanosTimeout):申请获取arg个原子状态status,带超时的版本。
- releaseShared(int arg):释放arg个原子状态status(释放成功可以简单理解为status = status + arg)。
先看acquireShared(int arg)的源码:
Copy// 共享模式下获取资源 public final void acquireShared(int arg) { // 注意tryAcquireShared方法值为整型,只有小于0的时候才会加入同步等待队列 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } // 共享模式下尝试获取资源,此方法需要由子类覆盖 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } // 共享模式下获取资源和处理同步等待队列的方法 private void doAcquireShared(int arg) { // 基于当前线程新建一个标记为共享的新节点 final Node node = addWaiter(Node.SHARED); boolean interrupted = false; try { for (;;) { final Node p = node.predecessor(); // 如果当前节点的前驱节点是头节点 if (p == head) { // 每一轮循环都会调用tryAcquireShared尝试获取资源,除非阻塞或者跳出循环 int r = tryAcquireShared(arg); if (r >= 0) { // <= tryAcquireShared方法>=0说明直资源获取成功 // 设置头结点,并且传播获取资源成功的状态,这个方法的作用是确保唤醒状态传播到所有的后继节点 // 然后任意一个节点晋升为头节点都会唤醒其第一个有效的后继节点,起到一个链式释放和解除阻塞的动作 setHeadAndPropagate(node, r); p.next = null; // help GC return; } } // 判断获取资源失败是否需要阻塞,这里会把前驱节点的等待状态CAS更新为Node.SIGNAL if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node); throw t; } finally { if (interrupted) selfInterrupt(); } } // 设置同步等待队列的头节点,判断当前处理的节点的后继节点是否共享模式的节点,如果共享模式的节点, // propagate大于0或者节点的waitStatus为PROPAGATE则进行共享模式下的释放资源 private void setHeadAndPropagate(Node node, int propagate) { // h为头节点的中间变量 Node h = head; // 设置当前处理节点为头节点 setHead(node); // 这个判断条件比较复杂:入参propagate大于0 || 头节点为null || 头节点的状态为非取消 || 再次获取头节点为null || 再次获取头节点不为取消 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; // 当前节点(其实已经成为头节点)的第一个后继节点为null或者是共享模式的节点 if (s == null || s.isShared()) doReleaseShared(); } } // Release action for shared mode:共享模式下的释放资源动作 private void doReleaseShared() { for (;;) { Node h = head; // 头节点不为null并且不为尾节点 if (h != null && h != tail) { int ws = h.waitStatus; // 如果头节点等待状态为SIGNAL(-1)则CAS更新它为0,更新成功后唤醒和解除其后继节点的阻塞 if (ws == Node.SIGNAL) { if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) continue; // 唤醒头节点的后继节点 unparkSuccessor(h); } // 如果头节点的等待状态为0,则CAS更新它为PROPAGATE(-3) else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) continue; } // 头节点没有变更,则跳出循环 if (h == head) break; } }
其实代码的实现和独占模式有很多类似的地方,一个很大的不同点是:共享模式同步器当节点获取资源成功晋升为头节点之后,它会把自身的等待状态通过CAS更新为Node.PROPAGATE,下一个加入等待队列的新节点会把头节点的等待状态值更新回Node.SIGNAL,标记后继节点处于可以被唤醒的状态,如果遇上资源释放,那么这个阻塞的节点就能被唤醒从而解除阻塞。我们还是画图理解一下,先假设tryAcquireShared(int arg)总是返回小于0的值,入队两个阻塞的线程thread-1和thread-2,然后进行资源释放确保tryAcquireShared(int arg)总是返回大于0的值:
看起来和独占模式下的同步等待队列差不多,实际上真正不同的地方在于有节点解除阻塞和晋升为头节点的过程。因此我们可以先看releaseShared(int arg)的源码:
Copy// 共享模式下释放资源 public final boolean releaseShared(int arg) { // 尝试释放资源成功则调用前面分析过的doReleaseShared以传播唤醒状态和unpark头节点的后继节点 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } // 共享模式下尝试释放资源,必须由子类覆盖 protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }
releaseShared(int arg)就是在tryReleaseShared(int arg)调用返回true的情况下主动调用一次doReleaseShared()从而基于头节点传播唤醒状态和unpark头节点的后继节点。接着之前的图:
接着看acquireSharedInterruptibly(int arg)的源码实现:
Copy// 共享模式下获取资源的方法,响应线程中断 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC return; } } // 和非响应线程中断的acquireShared方法类似,不过这里解除阻塞之后直接抛出异常InterruptedException if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; } }
最后看tryAcquireSharedNanos(int arg, long nanosTimeout)的源码实现:
Copy// 共享模式下获取资源的方法,带超时时间版本 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 注意这里只要tryAcquireShared >= 0或者doAcquireSharedNanos返回true都认为获取资源成功 return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); } private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; // 计算超时的最终期限 final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.SHARED); try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC return true; } } //重新计算剩余的超时时间 nanosTimeout = deadline - System.nanoTime(); // 超时的情况下直接取消获取 if (nanosTimeout <= 0L) { cancelAcquire(node); return false; } // 满足阻塞状态并且剩余的超时时间大于阀值1000纳秒则通过LockSupport.parkNanos()阻塞线程 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) LockSupport.parkNanos(this, nanosTimeout); // 解除阻塞后判断线程的中断标记并且清空标记位,如果是处于中断状态则抛出InterruptedException if (Thread.interrupted()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; } }
共享模式的同步器的一个显著特点就是:头节点的第一个有效(非取消)的后继节点,总是尝试获取资源,一旦获取资源成功就会解除阻塞并且晋升为头节点,原来所在节点会移除出同步等待队列,原来的队列长度就会减少1,重新设置头节点的过程会传播唤醒的状态,简单来说就是唤醒一个有效的后继节点,只要一个节点可以晋升为头节点,它的后继节点就能被唤醒,以此类推。节点的唤醒顺序遵循类似于FIFO的原则,通俗说就是先阻塞或者阻塞时间最长则先被唤醒。
使用共享模式同步器的主要类库有:
- 信号量Semaphore。
- 倒数栅栏CountDownLatch。
Condition的实现#
Condition实例的建立是在Lock接口的newCondition()方法,它是锁条件等待的实现,基于作用或者语义可以见Condition接口的相关API注释:
Condition是对象监视器锁方法Object#wait()、Object#notify()和Object#notifyAll()的替代实现,对象监视器锁实现锁的时候作用的效果是每个锁对象必须使用多个wait-set(JVM内置的等待队列),通过Object提供的方法和监视器锁结合使用就能达到Lock的实现效果。如果替换synchronized方法和语句并且结合使用Lock和Condition,就能替换并且达到对象监视器锁的效果。
Condition必须固有地绑定在一个Lock的实现类上,也就是要通过Lock的实例建立Condition实例,而且Condition的方法调用使用必须在Lock的"锁定代码块"中,这一点和synchronized关键字以及Object的相关JNI方法使用的情况十分相似。
前文介绍过Condition接口提供的方法以及Condition队列,也就是条件等待队列,通过画图简单介绍了它的队列节点组成。实际上,条件等待队列需要结合同步等待队列使用,这也刚好对应于前面提到的Condition的方法调用使用必须在Lock的锁定代码块中。听起来很懵逼,我们慢慢分析一下ConditionObject的方法源码就能知道具体的原因。
先看ConditionObject#await()方法:
Copy// 退出等待后主动进行中断当前线程 private static final int REINTERRUPT = 1; // 退出等待后抛出InterruptedException异常 private static final int THROW_IE = -1; /** * 可中断的条件等待实现 * 1、当前线程处于中断状态则抛出InterruptedException * 2、保存getState返回的锁状态,并且使用此锁状态调用release释放所有的阻塞线程 * 3、线程加入等待队列进行阻塞,直到signall或者中断 * 4、通过保存getState返回的锁状态调用acquire方法 * 5、第4步中阻塞过程中中断则抛出InterruptedException */ public final void await() throws InterruptedException { // 如果线程是中断状态则清空中断标记位并且抛出InterruptedException if (Thread.interrupted()) throw new InterruptedException(); // 当前线程所在的新节点加入条件等待队列 Node node = addConditionWaiter(); // 释放当前AQS中的所有资源返回资源的status保存值,也就是基于status的值调用release(status) - 其实这一步是解锁操作 int savedState = fullyRelease(node); // 初始化中断模式 int interruptMode = 0; // 如果节点新建的节点不位于同步队列中(理论上应该是一定不存在),则对节点所在线程进行阻塞,第二轮循环理论上节点一定在同步等待队列中 while (!isOnSyncQueue(node)) { LockSupport.park(this); // 处理节点所在线程中断的转换操作 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 节点所在线程被唤醒后,如果节点所在线程没有处于中断状态,则以独占模式进行头节点竞争 // 注意这里使用的status是前面释放资源时候返回的保存下来的status if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 下一个等待节点不空,则从等待队列中移除所有取消的等待节点 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); // interruptMode不为0则按照中断模式进行不同的处理 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } // 释放当前AQS中的所有资源,其实也就是基于status的值调用release(status) // 这一步对于锁实现来说,就是一个解锁操作 final int fullyRelease(Node node) { try { int savedState = getState(); if (release(savedState)) return savedState; throw new IllegalMonitorStateException(); } catch (Throwable t) { // 释放失败则标记等待状态为取消 node.waitStatus = Node.CANCELLED; throw t; } } // 传入的节点是否在同步队列中 final boolean isOnSyncQueue(Node node) { // 节点等待您状态为CONDITION或者前驱节点为null则返回false if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 因为等待队列是通过nextWaiter连接,next引用存在说明节点位于同步队列 if (node.next != null) return true; // 从同步队列的尾部向前遍历是否存在传入的节点实例 return findNodeFromTail(node); } // 从同步队列的尾部向前遍历是否存在传入的节点实例 private boolean findNodeFromTail(Node node) { for (Node p = tail;;) { if (p == node) return true; if (p == null) return false; p = p.prev; } } // 这是一个很复杂的判断,用了两个三目表达式,作用是如果新建的等待节点所在线程中断, // 则把节点的状态由CONDITION更新为0,并且加入到同步等待队列中,返回THROW_IE中断状态,如果加入同步队列失败,返回REINTERRUPT // 如果新建的等待节点所在线程没有中断,返回0,也就是初始状态的interruptMode private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } // 节点线程中断取消等待后的转换操作 final boolean transferAfterCancelledWait(Node node) { // CAS更新节点的状态由CONDITION更改为0 if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) { // 节点加入同步等待队列 enq(node); return true; } // 这里尝试自旋,直到节点加入同步等待队列成功 while (!isOnSyncQueue(node)) Thread.yield(); return false; } // 等待完毕后报告中断处理,前边的逻辑得到的interruptMode如果为THROW_IE则抛出InterruptedException,如果为REINTERRUPT则中断当前线程 private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); }
其实上面的await()逻辑并不复杂,前提是理解了对象监视器锁那套等待和唤醒的机制(由JVM实现,C语言学得好的可以去看下源码),这里只是通过算法和数据结构重新进行了一次实现。await()主要使用了两个队列:同步等待队列和条件等待队列。我们先假设有两个线程thread-1和thread-2调用了下面的代码中的process()方法:
CopyReentrantLock lock = new ReentrantLock(); Condition condition = lock.newCondition(); public void process(){ try{ lock.lock(); condition.await(); // 省略其他逻辑... }finally{ lock.unlock(); } }
ReentrantLock使用的是AQS独占模式的实现,因此在调用lock()方法的时候,同步等待队列的一个瞬时快照(假设线程thread-1先加入同步等待队列)可能如下:
接着,线程thread-1所在节点是头节点的后继节点,获取锁成功,它解除阻塞后可以调用await()方法,这个时候会释放同步等待队列中的所有等待节点,也就是线程thread-2所在的节点也被释放,因此线程thread-2也会调用await()方法:
只要有线程能够到达await()方法,那么原来的同步器中的同步等待队列就会释放所有阻塞节点,表现为释放锁,然后这些释放掉的节点会加入到条件等待队列中,条件等待队列中的节点也是阻塞的,这个时候只有通过signal()或者signalAll()进行队列元素转移才有机会唤醒阻塞的线程。因此接着看signal()和signalAll()的源码实现:
Copy// 从等待队列中移动一个等待时间最长的线程(如果过存在的话)到锁同步等待队列中 public final void signal() { // 判断当前线程是否和独占线程一致,其实就是此操作需要在锁代码块中执行 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } // 基于第一个等待节点进行Signal操作 private void doSignal(Node first) { do { // 首节点的下一个等待节点为空,说明只剩下一个等待节点 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 当前处理节点从链表从移除 first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } // 唤醒的转换操作 final boolean transferForSignal(Node node) { // CAS更新节点状态由CONDITION到0,更新失败则返回false不唤醒 if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) return false; // 节点作为新节点重新加入到同步等待队列 Node p = enq(node); int ws = p.waitStatus; // 取消或者更新节点等待状态为SIGNAL的节点需要解除阻塞进行重新同步,这里的操作只针对取消和状态异常的节点 if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } // 从等待队列中移动所有等待时间最长的线程(如果过存在的话)到锁同步等待队列中 public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } // 基于第一个等待节点进行SignalAll操作 private void doSignalAll(Node first) { // 置空lastWaiter和firstWaiter lastWaiter = firstWaiter = null; do { // 获取下一个等待节点 Node next = first.nextWaiter; // 当前处理节点从链表从移除 first.nextWaiter = null; // 处理当前节点 transferForSignal(first); // 更新中间引用 first = next; } while (first != null); }
其实signal()或者signalAll()会对取消的节点或者短暂中间状态的节点进行解除阻塞,但是正常情况下,它们的操作结果是把阻塞等待时间最长的一个或者所有节点重新加入到AQS的同步等待队列中。例如,上面的例子调用signal()方法后如下:
这样子,相当于线程thread-1重新加入到AQS同步等待队列中(从条件等待队列中移动到同步等待队列中),并且开始竞争头节点,一旦竞争成功,就能够解除阻塞。这个时候从逻辑上看,signal()方法最终解除了对线程thread-1的阻塞。await()的其他变体方法的原理是类似的,这里因为篇幅原因不再展开。这里小结一下Condition的显著特点:
- 1、同时依赖两个同步等待队列,一个是AQS提供,另一个是ConditionObject提供的。
- 2、await()方法会释放AQS同步等待队列中的阻塞节点,这些节点会加入到条件等待队列中进行阻塞。
- 3、signal()或者signalAll()会把条件等待队列中的节点重新加入AQS同步等待队列中,并不解除正常节点的阻塞状态。
- 4、接第3步,这些进入到AQS同步等待队列的节点会重新竞争成为头节点,接下来的步骤其实也就是前面分析过的独占模式下的AQS的运作原理。
取消获取资源(cancelAcquire)#
新节点加入等待队列失败导致任何类型的异常或者带超时版本的API调用的时候剩余超时时间小于等于零的时候,就会调用cancelAcquire()方法,用于取消该节点对应节点获取资源的操作。
Copy// 取消节点获取资源的操作 private void cancelAcquire(Node node) { // 节点为null直接返回 if (node == null) return; // 置空节点持有的线程,因为此时节点线程已经发生中断 node.thread = null; Node pred = node.prev; // 这个循环是为了获取当前节点的上一个不为取消状态的节点,也就是中间如果发生了取消的节点都直接断开 while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 保存当前节点的上一个不为取消状态的节点的后继节点 Node predNext = pred.next; // 当前节点等待状态更新为CANCELLED node.waitStatus = Node.CANCELLED; // 如果当前节点为尾节点,则直接更新尾节点为当前节点的上一个不为取消状态的节点 if (node == tail && compareAndSetTail(node, pred)) { // 然后更新该节点的后继节点为null,因为它已经成为新的尾节点 pred.compareAndSetNext(predNext, null); } else { int ws; // 当前节点的上一个不为取消状态的节点已经不是头节点的情况,需要把当前取消的节点从AQS同步等待队列中断开 if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) pred.compareAndSetNext(predNext, next); } else { // 当前节点的上一个不为取消状态的节点已经是头节点,相当于头节点之后的节点都是取消,需要唤醒当前节点的后继节点 unparkSuccessor(node); } // 节点后继节点设置为自身,那么就不会影响后继节点 node.next = node; } }
cancelAcquire()方法有多处调用,主要包括下面的情况:
- 1、节点线程在阻塞过程中主动中断的情况下会调用。
- 2、acquire的处理过程发生任何异常的情况下都会调用,包括tryAcquire()、tryAcquireShared()等。
- 3、新节点加入等待队列失败导致任何类型的异常或者带超时版本的API调用的时候剩余超时时间小于等于零的时候。
cancelAcquire()主要作用是把取消的节点移出同步等待队列,必须时候需要进行后继节点的唤醒。
实战篇#
AQS是一个抽象的同步器基础框架,其实我们也可以直接使用它实现一些高级的并发框架。下面基于AQS实现一些非内建的功能,这两个例子来自于AQS的注释中。
metux#
大学C语言课程中经常提及到的只有一个资源的metux(互斥区),也就是说,同一个时刻,只能有一个线程获取到资源,其他获取资源的线程需要阻塞等待到前一个线程释放资源。
Copypublic class Metux implements Lock, Serializable { private static class Sync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire(int arg) { assert 1 == arg; if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected boolean tryRelease(int arg) { assert 1 == arg; if (!isHeldExclusively()) { throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null); setState(0); return true; } public Condition newCondition() { return new ConditionObject(); } public boolean isLocked() { return getState() != 0; } @Override public boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); } } private final Sync sync = new Sync(); @Override public void lock() { sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); } public boolean isLocked() { return sync.isLocked(); } public boolean isHeldByCurrentThread() { return sync.isHeldExclusively(); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); } public static void main(String[] args) throws Exception { final Metux metux = new Metux(); new Thread(() -> { metux.lock(); System.out.println(String.format("%s-thread-1获取锁成功休眠3秒...", LocalDateTime.now())); try { Thread.sleep(3000); } catch (InterruptedException e) { //ignore } metux.unlock(); System.out.println(String.format("%s-thread-1获解锁成功...", LocalDateTime.now())); return; }, "thread-1").start(); new Thread(() -> { metux.lock(); System.out.println(String.format("%s-thread-2获取锁成功...",LocalDateTime.now())); return; }, "thread-2").start(); Thread.sleep(Integer.MAX_VALUE); } }
某个时间的某次运行结果如下:
Copy2019-04-07T11:49:27.858791200-thread-1获取锁成功休眠3秒... 2019-04-07T11:49:30.876567-thread-2获取锁成功... 2019-04-07T11:49:30.876567-thread-1获解锁成功...
二元栅栏#
二元栅栏是CountDownLatch的简化版,只允许一个线程阻塞,由另一个线程负责唤醒。
Copypublic class BooleanLatch { private static class Sync extends AbstractQueuedSynchronizer { boolean isSignalled() { return getState() != 0; } @Override protected int tryAcquireShared(int ignore) { return isSignalled() ? 1 : -1; } @Override protected boolean tryReleaseShared(int ignore) { setState(1); return true; } } private final Sync sync = new Sync(); public boolean isSignalled() { return sync.isSignalled(); } public void signal() { sync.releaseShared(1); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public static void main(String[] args) throws Exception { BooleanLatch latch = new BooleanLatch(); new Thread(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { //ignore } latch.signal(); }).start(); System.out.println(String.format("[%s]-主线程进入阻塞...", LocalDateTime.now())); latch.await(); System.out.println(String.format("[%s]-主线程进被唤醒...", LocalDateTime.now())); } }
某个时间的某次运行结果如下:
Copy[2019-04-07T11:55:12.647816200]-主线程进入阻塞... [2019-04-07T11:55:15.632088]-主线程进被唤醒...
小结#
在JUC的重要并发类库或者容器中,AQS起到了基础框架的作用,理解同步器的实现原理,有助于理解和分析其他并发相关类库的实现。这篇文章前后耗费了接近1个月时间编写,DEBUG过程最好使用多线程断点,否则很难模拟真实的情况。AQS里面的逻辑是相对复杂的,很敬佩并发大师Doug Lea如此精巧的类库设计,此所谓巨人的肩膀。
参考资料:
- 《The Art of Multiprocessor Programming》
- 《The java.util.concurrent Synchronizer Framework》 - http://gee.cs.oswego.edu/dl/papers/aqs.pdf
- JDK11相关源码