在Java中,如果要对某一共享资源进行互斥访问,一般可以通过使用synchronized关键字或者是java.util.concurrent.locks包下面的某些类来实现,比如ReentrantLock。这个包下的所有类几乎都是以AbstractQueuedSynchronizer(后文中简称AQS)为基础来构建的,AQS为锁的实现提供了一个基本机制,包括挂起和唤醒某一线程、自动管理同步状态以及将获取不到锁的线程入队等,在这些基础机制之上,可以实现各种特性不一样的锁,比如公平锁,非公平锁,可被中断的锁等等
在基本用法上,synchronized和ReentrantLock很相似,基本用法都差不多,语义也差不多,都具备可重入等特性。synchronized是在字节码层面上通过monitorenter和monitorexit来实现的,线程的阻塞(获取不到锁时)和唤醒(待获取的锁被另一拥有它的线程释放时)是内核自己调度的,而基于AQS的锁实现可以让实现者自己决定线程获取锁的规则(当某一线程释放锁,n多之前因为没有获取到锁而阻塞的线程去获取这个已经释放的锁时),比如公平锁和非公平锁,相比于synchronized,ReentrantLock有比较多的高级特性,有业务需求的话也可以通过实现AQS来进行扩展,可扩展性较好
分析AQS之前需要知道CAS(即compare-and-swap,比较并交换)。某些硬件的指令集提供了CAS指令,CAS指令需要三个操作数,分别是内存地址Loc,旧的预期值oldValue,新的更新值updateValue,CAS指令执行时,当且仅当oldValue等于Loc位置处的值时,将Loc处的值更新为updateValue,否则不更新。这个CAS操作是一个原子操作,且比synchronized更轻量。
如果不想使用synchronized来更新共享变量的值(考虑到性能),可以用CAS+volatile的方式来保证线程安全,CAS操作保证了原子性,而volatile保证了可见性
AQS里面会用到一个比较重要的类,sun.misc.Unsafe,这个类的大部分方法都是native方法,可以执行一些比较“另类”的方法,例如向操作系统申请内存,也可以用Unsafe来执行CAS操作,挂起和唤醒线程也是通过Unsafe的park和unpark来实现的
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private transient volatile Node head; //链表的头指针
private transient volatile Node tail; //链表的尾指针
private volatile int state; ///同步状态
static final class Node {
static final Node SHARED = new Node(); //当前等待的锁为共享模式
static final Node EXCLUSIVE = null; //当前等待的锁为排他模式
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
}
可以看到AQS里面有一个内部类Node,Node里面有prev和next引用,因此Node可以构成一个双向链表,并且Node里面还有一个Thread的引用,可以猜想,每一个打算去获取锁的线程在AQS里面都会先包装成一个Node,假如获取不到锁的话,会将当前的Node加入这个链表,当某一个线程释放锁的时候,可以从链表里面取一个正在等待的Node,并将锁给他
首先来看看获取锁的方法
public final void acquire(int arg){
if(!tryAcquire(arg)&&
acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
selfInterrupt();
}
整个方法的逻辑为:首先尝试去获取锁,如果由于竞争失败导致获取锁失败,则将自身包装成一个node对象加入到链表末尾,并尝试去获取锁
其中tryAcquire(arg)为一个protected方法,需要子类去实现,这个方法试图去获取锁,如果成功获取到锁,则返回true,否则返回false。等下次分析某个AQS的实现类时,可以看到tryAcquire(arg)的一些具体实现(会涉及到同步状态state),但是现在可以不用管它,只需要知道,如果获取到锁了就返回true,没获取到锁就返回false
如果没有获取到锁,则进入addWaiter(Node mode)方法
本文中,说到步骤的时候都是以 “方法名.步骤n” 这样一种形式来描述,以避免冲突,假如方法是当前正在分析的方法,则省略方法名,只说步骤n
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode): //1
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) { //2
node.prev = pred;
if (compareAndSetTail(pred, node)) { //3
pred.next = node;
return node;
}
}
enq(node); //4
return node; //5
}
这个方法的步骤为:
1:首先利用当前线程创建node节点,并取出尾节点tail
2:判断之前的尾节点tail是否为null。如果不为null,则将新创建节点node的prev引用指向尾节点tail,并进入步骤3。如果尾节点为null,则进入步骤4
3:这时候利用CAS操作试图将尾节点修改为新创建的node节点。如果CAS操作成功,则新创建的节点成为尾节点,并且将之前尾节点的next引用指向新创建的节点node,实现一个链表中两个节点的连接行为,并返回这个新创建的节点node。如果CAS操作失败,则说明已经有其它线程抢先一步将它自己加入到了链表中,此时,进入步骤4
4:如果尾节点为null或者步骤3中的CAS操作失败,则会进入此步骤,注意步骤4中传入的参数为新创建的node节点
接下来看下步骤4中的enq(Node node)方法,注意,enq方法是用final修饰的
private Node enq(final Node node) {
for (; ; ) {
Node t = tail; //1
if (t == null) { // Must initialize
if (compareAndSetHead(new Node())) //2
tail = head;
} else {
node.prev = t; //3
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
此方法中,for是一个无线循环,出口只有else分支里面的一个return语句,此方法的步骤为:
1:首先取出尾节点tail,并判断尾节点是否为null。如果尾节点为null,则进入步骤2,否则进入步骤3
2:新创建一个节点(这个节点并没有绑定某一个线程,可以认为是一个“空”节点),并通过CAS操作尝试将其设置为头结点,如果设置成功,将尾节点也指向刚创建的节点,并进入下一次循环,此时tail节点已经不为null。需要注意的是这里并没有采用CAS的方式将新创建的节点设置为tail,这是不必要的,因为return语句在else分支里面,而进入else分支的条件是尾节点不为null,假如我们在tail = head处打一个断点,并且让当前线程跑到这个断点处,这时候让其它的线程调用acquire方法,当其他的线程进入enq方法后,由于头结点已经被设置了,所以其他线程在//2处的CAS操作将失败,此时会进行下一次循环,从这可以看到,初始化的时候,假如A线程设置了head节点,那就只有A线程有权利更新尾节点tail,之后方法才能正常进行下去。而只有设置了tail节点之后,代码才有可能走到else分支从而返回。
3:如果尾节点tail存在,则将传入参数node的prev指针指向tail,并利用CAS操作尝试将传入的节点node设置为尾节点,如果CAS操作成功,则将之前尾节点的next指针指向传入的参数node,并返回之前的尾节点tail,如果CAS操作失败(其它线程抢先一步将其自己设置成了尾节点),则进入下一次循环并继续尝试将传入的参数节点node设置为尾节点,直到CAS操作在某次重试中成功
当enq(Node node)方法成功返回后,进入步骤addWaiter.5,返回新创建的node
所以addWaiter()要做的就是,在多线程的环境下,竞争锁失败的时候,将当前线程包装成一个node节点,并将其安全地插入到链表的末尾。
需要注意的是,假如之前一个线程已经获得了某一个锁且未释放,也没有其它线程请求去获取锁,这时候链表中并没有任何节点元素,这时候假如有另一个线程去请求获取这个锁,则会将当前线程封装的节点node插入链表末尾,这个节点在链表中的位置并不是头结点,而是链表中的第二个节点。初始化链表的时候会插入一个不绑定线程的“空”节点
接下来调用acquireQueued方法
acquireQueued(final Node node, int arg)方法如下所示,可以看到,只有在当前节点的前一个节点是头结点head的时候才有可能获取到锁(即当前节点的前一个节点为head头结点是当前节点能获取到锁的前提条件,考虑到非公平锁,这么说其实有点不太正确)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; //1
try {
boolean interrupted = false;
for (; ; ) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { //2
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && //3
parkAndCheckInterrupt())
interrupted = true;
}
} finally { //4
if (failed)
cancelAcquire(node);
}
}
该方法的步骤为:
1:首先将failed初始化为true,表示获取锁是否失败,失败了会在步骤4中做一些后续工作,至于interrupted的含义,会在后文看到。然后取出当前节点node的前一个节点p 。如果p为头结点head且当前线程能获取到锁,则进入步骤2。否则,如果不是头结点或者获取不到锁,进入步骤3
2:当当前节点node的prev为head头结点,且能获取到锁时,进入该步骤。将当前节点设为头结点,并且将之前的head头结点从链表中分离,这时候将failed设置为false,表示成功获取到了锁。并且返回interrupted。注意,这里设置头结点head的时候并不需要通过CAS操作来进行,因为没有竞争
3:如果当前节点node的前一个节点不为head头节点,或者没有获取到锁,会进入该步骤。该步骤会调用shouldParkAfterFailedAcquire(Node pred, Node node)方法,这个方法的功能已经被方法名描述的很形象了,就是判断锁获取失败之后需不需要将当前线程挂起(AQS中通过park和unpark方法来挂起和唤醒线程),假如shouldParkAfterFailedAcquire返回true,则调用parkAndCheckInterrupt方法,如果parkAndCheckInterrupt也返回true,则将interrupted赋值为true
首先看看shouldParkAfterFailedAcquire方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) //1
return true;
if (ws > 0) { //2
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { //3
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
先说一下内部类Node的waitStatus字段以及CANCELLED、SIGNAL、CONDITION、PROPAGATE等字段,在源码里有关于这几个字段的简短说明
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
大致意思就是说,waitStatus表示了当前节点所代表线程所在的状态,有可能为CANCELLED、SIGNAL、CONDITION、PROPAGATE、0中其中的一种
CANCELLED表示当前线程由于超时或者被中断
SIGNAL表示当前节点的next指向的节点正在阻塞或即将被阻塞,所以当前节点释放锁或者取消时要唤醒其后一个节点(也就意味着假如当前节点的waitStatus为SIGNAL,当它释放锁或者取消时会去唤醒它的下一个节点)
CONDITION:表示当前节点处于条件队列中
PROPAGATE:表示唤醒当前节点的动作需要向后传播(不仅仅限于当前节点的后一个节点),在某些共享锁里面会用到这个,排它锁不会用到这个值
1:如果当前节点node的前一个节点pred的waitStatus为SIGNAL,则返回true(如果pred节点的waitStatus为SIGNAL,则意味着当它释放锁或者取消时会去唤醒它的下一个节点node,所以这时候我们只需要将node所代表的线程挂起就行了,所以返回true)
2:如果当前节点node的前一个节点的waitStatus大于0,会将node之前的waitStatus大于0的所有节点从链表中删除掉,直到某一个节点的waitStatus小于等于0,并且将这个节点和node互相关联起来
3:如果当前节点node的前一个节点pred的waitStatus等于0,则尝试用CAS操作将pred节点的waitStatus值改为Node.SIGNAL
在分支2和分支3中,由于其前面一个节点的waitStatus不为SIGNAL,所以没办法保证会有一个线程去唤醒node,则只能返回false且进入下一次循环(即只有在前一个节点的waitStatus为SIGNAL的情况下,后面的线程才有可能被唤醒,即挂起node所代表的线程是安全的)
如果shouldParkAfterFailedAcquire方法返回true,则进入parkAndCheckInterrupt()方法
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
可以看到parkAndCheckInterrupt()方法很简单,只是挂起当前线程并返回当前线程的中断标记(如果有其他线程唤醒它)
如果当前线程被设置过中断标记(注意,interrupted方法会清除掉线程的中断状态)
则步骤acquireQueued.3中会将interrupted置为true并返回,以标志当前线程在等待锁的过程中是否被中断过
当当前线程被唤醒后,又会进入下一次循环,继续下一次获取锁的尝试,直到其获取到锁或者抛出异常
当acquireQueued返回true时,acquire方法的selfInterrupt方法会设置当前线程的中断标记
整个acquire方法的精髓就在这儿了,一个节点node的前一个节点假如是head节点,他才会去尝试获取锁,需要注意的是,这里不一定能够获取到锁(非公平锁),假如获取不到锁,线程会尝试将自己挂起,等待被中断或者被其它的线程唤醒,当被唤醒后,会继续尝试去获取锁,直到获取到锁或者抛出异常,又或者被取消等等。。
接下来说下线程释放锁的过程:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) { //1
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
在unparkSuccessor方法中,如果node的waitStatus为0,首先会尝试将node的waitStatus设置为0,就算CAS操作失败了也不要紧,这意味着node的waitStatus已经被改变了。
在步骤1中,如果当前节点node的后一个节点s不为null且s的waitStatus小于等于0,则可以直接唤醒s对应的线程,如果s为null或者s的waitStatus大于0,则会从链表的末尾开始向前找,直到找到一个距离节点node最近的且waitStatus小于等于0的节点,并唤醒它
Doug Lea大师把CAS指令玩的飞起。只能膜拜!!
待续...
key words:AQS CAS Unsafe
关于AQS,多线程大师Doug Lea发表了一篇论文,《The java.util.concurrent Synchronizer Framework》,感兴趣的可以看看