1.线程
1.1 简介
在介绍AbstractQueuedSynchronizer之前先介绍一下我们的线程。为了提高程序的并行化,从而产生了进程,多进程的好处是将一个程序可以划分为多个进程,之前不相互影响,并且可以比较容易的进行内存共享和通信。并且在多cpu的情况下如果只有一个进程就会浪费cpu的资源。那有了进程为什么还会有线程呢,这是因为进程的创建和销毁会耗费较多的资源,所以此时通过多线程来进行并发编程。一个进程可以包含多个线程,进程内的线程共享同一个堆空间。
1.2 线程状态
线程生命周期
- 新建状态,此时,线程已经被初始化,其内部的run方法已经被覆盖或者实现
- 就绪状态,调用线程的start方法,线程进入就绪状态,此时,线程被放入就绪队列,等待获取cpu执行权
- 运行状态,此时,线程已经从就绪列表中被取出来抢占了cpu执行权,至于是怎么被选取的,这是优先调度算法的内容,后面再谈
- 阻塞状态,处于运行中的线程由于某种原因,放弃对cpu的执行权,此时,进入阻塞状态,阻塞包括等待阻塞,同步阻塞与其他阻塞,等待阻塞包括wait方法调用,同步阻塞一般为在锁的等待队列中,其他阻塞一般为thread的join方法或者sleep方法进行挂起
- 终结状态,当程序运行完毕(包括成功或者抛出异常)是线程进入终结状态。
线程状态:
2.AbstractQueuedSynchronizer分析
AbstractQueuedSynchronizer是java中的同步阻塞器,也是实现ReentrantLock的关键。并且是实现线程工具类的核心方法。在AbstractQueuedSynchronizer中,使用了cas操作来进行线程状态的变更,下面,我们来看一看其中的重要方法。
2.1 Node类介绍
Node类是AbstractQueuedSynchronizer中的内部类,代表wait队列中的一个节点,wait队列是什么呢,其实就是阻塞队列,包含condition阻塞和同步阻塞
2.1.1 重要变量
//标记表示节点正在共享模式下等待
static final Node SHARED = new Node();
//标记标识节点正在独占模式下等待
static final Node EXCLUSIVE = null;
//waitStatus为1表示该节点此时为取消节点,即因为超时或者被interrupt打断处于此状态,当线程处于此状态,则不能改变状态,也不能重新被阻塞
static final int CANCELLED = 1;
//waitStatus为-1表示当前节点正处于运行状态,此时表示当其前置节点释放锁时他可以被唤醒
static final int SIGNAL = -1;
//waitStatus为-2时表示当前节点正处于CONDITION队列中,直到它从CONDITION队列中移除才能重新进入就绪队列
static final int CONDITION = -2;
//waitStatus为-3时表示当前节点状态在共享模式中使用,表示可运行状态
static final int PROPAGATE = -3;
//指示节点在节点队列中的等待状态,默认为0,使用cas进行修改
volatile int waitStatus;
//当前节点的前驱节点,主要可以用来检查自身的前驱节点的状态来决定自身的行为
volatile Node prev;
//当前节点的后续节点
volatile Node next;
//当前节点对应的线程,调用构造函数进行初始化,并在出队时置为null
volatile Thread thread;
//在独占模式下表示condition队列中的下一个节点。在共享模式中,用于标识是否为共享模式
Node nextWaiter;
2.1.2重要方法
//判断是否为共享节点,用nextWaiter标识
final boolean isShared() {
return nextWaiter == SHARED;
}
//返回对应的前驱节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
2.2重要的变量
//等待队列的头结点
private transient volatile Node head;
//等待队列的尾节点
private transient volatile Node tail;
//同步资源
private volatile int state;
2.3重要方法
2.3.1 acquire方法
可以看到acquire方法中先调用了tryAcquire方法,步骤如下:
1.tryAcquire方法尝试获取资源,如果获取成功直接返回
2.如果tryAcquire不成功,那么调用addWaiter将当前线程加入等待队列,并标记为独占模式
3.然后acquireQueued使线程在等待队列中循环获取资源
4.获取资源后,调用selfInterrupt获取中断状态
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
下面,就逐个介绍
2.3.1.1 tryAcquire方法
tryAcquire方法AQS框架不实现,由底层同步器进行封装
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
2.3.1.2 addWaiter方法
addWaiter方法主要用于将Node节点放入等待队列中进行等待
private Node addWaiter(Node mode) {
//用当前线程与mode对自身进行初始化,mode分为独占和共享
Node node = new Node(Thread.currentThread(), mode);
// 先调用一次快速方法,如果失败,调用enq方法进行入队列循环
Node pred = tail;
if (pred != null) {
node.prev = pred;
//如果等待队列的tail节点不为空,那么将node设置为新的tail节点
if (compareAndSetTail(pred, node)) {
//如果cas设置tail节点成功,那么设置之前尾节点的next节点为node
pred.next = node;
return node;
}
}
//如果没有设置成功,调用enq方法设置
enq(node);
return node;
}
compareAndSetTail是用cas设置tail节点的方法,不过在addWaiter方法中,只调用一次,可能会失败
private final boolean compareAndSetTail(Node expect, Node update) {
//CAS调用
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
enq方法,通过循环调用CAS,此方法一定会成功,不过在线程竞争比较激烈的情况下性能可能会有点差。可以看到,此方法中采取了Atomic类中经典的循环调用cas方法,只有成功才会return
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//如果尾节点为空,那么代表等待队列为空,进行CAS设置头结点,并且令尾节点和头结点相等
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
//如果不为空,进行循环设置尾节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
2.3.1.3 acquireQueued方法
此方法使线程在等待队列中等待获取资源,直到等到获取到资源才会返回。如果线程在此过程中被中断过,那么会用interrupted进行记录。此方法步骤为
1.判断当前线程所处节点的前驱节点是否为头结点
2.如果是前驱节点为头结点,那么尝试获取资源,如果获取资源成功
3.那么设置头结点为当前线程所处节点,并且将前面头结点的next设置为空(使得前面头结点不关联任何节点,方便gc清理)
4.设置failed = true,并且返回线程是否中断过
5.如果不是头结点或者没有获取到资源,那么会调用shouldParkAfterFailedAcquire方法和parkAndCheckInterrupt方法,如果parkAndCheckInterrupt返回true,代表线程被中断过,对标志位进行记录
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//判断当前线程所处节点的前驱节点是否为头结点
if (p == head && tryAcquire(arg)) {
//设置头结点为当前线程所处节点,并且将前面头结点的next设置为空
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
下面,我们来介绍shouldParkAfterFailedAcquire方法,此方法用来判断检查线程是否应该被阻塞
1.记录前驱状态
2.当前驱状态为SIGNAL时,安心进行park操作,因为此时前驱如果拿到资源,会对后继节点进行通知
3.当ws>0时,代表前驱状态为cancel,此时,从当前节点的前驱节点遍历,直到遇到不为cancel状态的前驱节点,设置当前节点的前驱节点为此节点(此时一定能找到这样一个节点,因为head节点一定不为cancel)。
4.如果前驱节点小于0,那么此时将ws的前驱节点的waitState设置为Node.SIGNAL
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
再介绍一下parkAndCheckInterrupt方法
private final boolean parkAndCheckInterrupt() {
//对线程进行阻塞操作,内部实现调用了unsafe的park操作
LockSupport.park(this);
//返回线程是否被中断,并重置中断位
return Thread.interrupted();
}
2.3.2 release方法
release方法主要用于独占模式下对资源的释放,一般资源默认为1,可以看出步骤分为以下几步
1.请求释放资源
2.如果释放资源成功,那么获取当前头节点(需要说明一下,此时,头结点一般为对应的线程)
3.判断头结点是否为空,如果头结点为空或者状态为初始化,直接返回true
4.如果头结点不为空并且其状态不为0,对h的后继节点进行唤醒
5.如果释放资源失败,返回false
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//如果头结点不为空并且其状态不为0(此时应该为SIGNAL状态),进行唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}
下面,介绍一下release中使用到的方法
2.3.2.1 tryRelease方法
与tryAcquire方法一样,tryRelease也是留给自定义同步器去覆盖实现的,如果不覆盖,调用此方法直接抛出异常
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
2.3.2.2 unparkSuccessor方法
此方法主要用来唤醒后继节点,下面看看是怎么唤醒的吧
1.获取到当前节点的状态
2.如果当前节点状态小于0,重新将该节点的状态置0,不保证一定成功,允许失败
3.获取当前节点的next节点
4.如果当前节点的next节点为null或者状态为cancel,那么从尾部开始遍历,直到前驱节点为空。在遍历过程中,如果遇到前驱节点不为cancel的情况,更新当前节点需要唤醒的后继节点
5.如果能够遍历到不为cancel状态的后继节点,那么进行唤醒,否则,不做操作
private void unparkSuccessor(Node node) {
//获取到当前节点的状态,如果当前节点状态小于0,重新将该节点的状态设置为初始化
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//一般情况下当前节点的next节点即为后继节点,但是后继节点可能为null或者状态被置为Canceled
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
//从尾部开始遍历,找到距离当前节点最近的状态不为cancel的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//找到可以唤醒的后继节点之后,对该节点进行唤醒
if (s != null)
LockSupport.unpark(s.thread);
}
其中,compareAndSetWaitStatus就是调用unsafe的cas操作对节点状态进行更新
2.3.3 acquireShared方法
介绍完了独占模式下获取资源的方法,下面,我们来介绍共享模式下获取锁的方法。步骤如下:
1.传入需要获取的资源数,尝试获取共享锁
2.如果获取共享锁成功,不做任何操作
3.获取共享锁失败,进入等待队列中进行等待。代码如下:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
下面,介绍一下acquireShared调用的两个方法。