填坑上期没有说的AQS也就是AbstractQueuedSynchronizer
学习的过程大致有两种思路
一种是:自顶向下 从应用到原理(我走的大致是这条)
另一种是:自底向上 从原理到应用
虽然说java的底层以及在unsafe这个类中封装了cas的方法,支持了cas的调用,但是对于上层业务来说,怎么进行无感调用,此外我们再业务代码中往往想要同步的是某些被竞争的资源,这种被竞争的资源往往在生产中是以对象的形式进行封装,而cas只能原始的修改内存中的一个值,那我们如何用cas去同步对象呢?所以我们就需要进行抽象。
如果想要设计一个同步管理的框架需要怎么设计?
1、通用性,下层实现透明的同步机制,同时与上层的业务进行解耦。
2、利用CAS,原子的修改共享标记位。
3、等待队列,分为两种情况,第一种只要询问一下资源即可,我们返回ture或者false即可,第二组必须使用资源,此时我们就需要对第二组情况进行设计队列使其进行排队。
思路如此,我这样的菜狗肯定实现不了呀,所以我们看看大师dog(不是) Doug Lea的aqs源码的内容。
又到了我最喜欢(不是)的读源码内容了
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
头结点
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
尾节点
*/
private transient volatile Node tail;
/**
* The synchronization state.
判断共享资源是否正在被占用的那个标志位,使用volatile保证了线程之间的可见性。一开始我在想为啥要是int而不是Boolean呢?实际上就是线程的独占和共享两种模式,共享模式标记占用资源的数量,所以用int来表示。
*/
private volatile int state;
若一个线程在当前时刻没有获取到共享资源,他可以选择进行排队,而这个队列就是FIFO数据结构即先进先出的双向链表,里面的head和tail表示队列的头和尾。
我们看看线程的Node是个啥样的
static final class Node {
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
volatile int waitStatus;//等待状态
volatile Node prev;//前指针
volatile Node next;//后指针
volatile Thread thread;//线程对象
...
}
waitStatus是一个枚举值,包含四个状态(取消,信号,状态,传播)
两种使用场景 1.尝试获取锁(修改标记位),立即返回、tryAcquire
2.一个要获取锁,愿意进入队列等待,直到获取acquire
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
被protected修饰的方法,参数是一个int代表对state的修改,返回值则是是否成功获取锁,只有一行实现,就是抛出一个异常,使用就是它的继承类必须override这个tryAcquire方法否则就直接抛出不支持该操作的异常,为上层调用开发这个空间
public class test02 extends AbstractQueuedSynchronizer{
@Override
protected boolean tryAcquire(int arg){
//上层业务逻辑
if (arg!=1)
return false;
if (getState()==1)
return false;
return compareAndSetState(0,1);
}
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
我解释一下,如果tryAcquire获得锁,那!tryAcquire就直接跳出去不再执行后面的内容了执行selfInterrupt();
如果tryAcquire返回false则执行acquireQueued()方法进行排队以等待锁acquireQueued(addWaiter)
这俩方法是嵌套的所以我看看这个的源码,顾名思义就是将当前的线程封装为一个node,
加入等待队列而返回值就是当前的节点
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
流程大致是这样的,先创建一个节点new Node,然后将这个新建的节点插入到队尾,
我们先获取尾节点的指针Node pred = tail;让它变成当前节点的前置节点,如果为节点不为空,
则通过cas操作将当前的节点置为尾节点然后将前置节点的next指针指向以及成为尾节点的当前指针。
但是
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
不会有线程安全问题吗?仔细分析,不难发现,这些代码只是将当前节点指向尾节点,
即使此时尾节点发生了变动,其实也没太大影响。 若是其cas失败(也就是没有进入if判断)
,那就会在完整的入队方法里也就是 enq/()
private Node enq(final Node node) {
for (;;) {//自旋
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))//初始化
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq会对当前队列进行初始化通过cas将当前节点插入直到入队成功为止
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
} acquireQueued方法内定义了一个局部变量failed,
初始值为true,整个方法中只有在return之前failed值会改为false
而在finally中通过判断failed的值进行一个名为cancelAcquire的操作,
所以我们能发现在acquireQueued方法正常执行且return时,failed都为false
只有抛出异常的时候进入finally的时候failed才是true此时才会进行cancelAcquire(node);
操作 cancelAcquire的内部实现简单来说就是将Node的waitStatur设置为CANCEL以及一些其他的清理操作就不细讲了、
咱主要还是看try中的内容
我们发现出现了一个陌生的变量interrupted,看应该是中断,如果当前的线程需要被挂起并且成功挂起,
并且返回了thread点interrupted方法为trur的时候,变量interrupted才会设置为true并返回~ 主体仍然是一个自旋状态,
如果当前的节点前置节点为头结点,且当前线程尝试获取锁成功了,那直接返回即可(在fifo中,头结点其实是一个虚节点,
头结点并不是当前需要去拿锁的节点只是充当占位的摆设)而第二个节点才是真正要去拿锁的节点,当第二个节点拿到锁
之后就会变成头节点头结点(老的)就会出队,所以我们常会看到 if (p == head && tryAcquire(arg))当前节点的是否为头
结点这样的代码,作用就是判断这个节点有没有资格拿锁。
修饰符是public和final,说明所以继承类都能直接调用这个方法且不允许继承类擅自进行重载,意思是我一定要得到锁。
判断是否需要挂起当前线程,通过前置节点的waitStatus
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
ws为SIGNAl说明前置节点也在等待拿锁,所以当前节点是可以挂起休息的直接返回true
如果waitStatus>0说明状态只有可能是CANCEL,所以可以将其删除,
如果waitStatus是其他状态,那么既然当前的节点已经加入了,那么前置节点就要做好准备来等待锁所以通过cas将前置节点的waitStatus置为SINGLE这两种情况下返回false进行下一轮判断,如果shouldParkAfterFailedAcquire返回true则代表当前的节点需要被挂起,则执行真正的挂起
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
简单来说就是当前的节点处于头节点的后一个,就会不断自旋尝试拿锁知道拿锁成功,否则进行判断是否需要被挂起,而是否需要挂起,则是prev.waitStatus,如果当前线程所在的节点之前除了head还有其他的节点,且waitStatus为SINGAL就需要被挂起,这样就保证了head的之后只有一个节点在通过cas获取锁,队列里面其他的线程都已经被挂起或者正在被挂起,最大限度的避免了无用的自旋消耗了cpu。释放锁的时候发生什么事情了?咱看看release方法
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
主要看看unparkSuccessor
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
传来的HEAD,该方法是为了唤醒head后面的node,使得其自旋地获取锁,唤醒head之后被挂起的线程