java并发编程实战三之Lock原理
java locks包的核心类是AQS(AbstractQueuedSynchronizer), AQS的核心实现其实是一个自旋锁
1. 自旋锁(Spin Lock)
自旋锁是指当一个线程尝试获取某个锁时,如果该锁已被其他线程占用,就一直循环检测锁是否被释放,而不是进入线程挂起或睡眠状态
自旋锁适用于锁保护的临界区很小的情况,临界区很小的话,锁占用的时间就很短
1.1 简单自旋锁
下面的代码是自旋锁的一种简单实现
它有很多的缺陷,你可以想到多少缺点???
public class SpinLock {
/**
* 使用原子类来标识线程是否获取到了锁
*/
private AtomicReference<Thread> owner = new AtomicReference<Thread>();
public void lock() {
Thread currentThread = Thread.currentThread();
//如果锁未被占用,则设置当前线程为锁的拥有者
while (!owner.compareAndSet(null, currentThread)){
}
}
public void unlock(){
Thread currentThread = Thread.currentThread();
// 只有锁的拥有者才能释放锁
owner.compareAndSet(currentThread, null);
}
}
1.2 Ticket Lock
为了解决公平性问题,我们模仿银行排队的方式,设计了另一种公平锁
Ticket Lock 虽然解决了公平性的问题,但是多处理器系统上,每个进程/线程占用的处理器都在读写同一个变量serviceNum ,每次读写操作都必须在多个处理器缓存之间进行缓存同步,这会导致繁重的系统总线和内存的流量,大大降低系统整体的性能
public class TicketLock {
/**
* 服务号
*/
private AtomicInteger serviceNum = new AtomicInteger();
/**
* 排队号
*/
private AtomicInteger ticketNum = new AtomicInteger();
public int lock(){
//首先原子性地获得一个排队号
int myTicketNum = ticketNum.getAndIncrement();
//只要当前服务号不是自己的就不断轮询
while (serviceNum.get() != myTicketNum){
}
return myTicketNum;
}
public void unlock(int myTicket){
int next = myTicket + 1;
serviceNum.compareAndSet(myTicket, next);
}
}
1.3 CLH队列锁
CLH锁是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋
CLH在SMP系统结构下该法是非常有效的。但在NUMA系统结构下,每个线程有自己的内存,如果前趋结点的内存位置比较远,自旋判断前趋结点的locked域,性能将大打折扣
当一个线程需要获取锁时:
a. 创建一个的QNode,将其中的locked设置为true表示需要获取锁
b. 线程对tail域调用getAndSet方法,使自己成为队列的尾部,同时获取一个指向其前趋结点的引用myPred
c. 该线程就在前趋结点的locked字段上旋转,直到前趋结点释放锁
d. 当一个线程需要释放锁时,将当前结点的locked域设置为false,同时回收前趋结点
CLH Lock
public class CLHLock {
/**
* tail 指向最后线程的节点
* current 每个线程的节点
*/
private final AtomicReference<CLHNode> tail = new AtomicReference<>(new CLHNode());
private final ThreadLocal<CLHNode> current;
public CLHLock() {
current =ThreadLocal.withInitial(CLHNode::new);
}
public void lock(){
CLHNode own = current.get();
own.locked = true;
CLHNode preNode = tail.getAndSet(own);
//轮询前驱节点
while (preNode.locked){
}
}
public void unlock(){
//当前线程节点上释放
current.get().locked = false;
}
private static class CLHNode{
private volatile boolean locked = false;
}
}
1.4 MCS锁
为了解决NUMA系统结构下CLH自选出现的性能问题,MCS队列锁应运而生
MSC与CLH最大的不同并不是链表是显示还是隐式,而是线程自旋的规则不同:CLH是在前趋结点的locked域上自旋等待,而MSC是在自己的结点的locked域上自旋等待。正因为如此,它解决了CLH在NUMA系统架构中获取locked域状态内存过远的问题。
a. 队列初始化时没有结点,tail=null
b. 线程A想要获取锁,于是将自己置于队尾,由于它是第一个结点,它的locked域为false
c. 线程B和C相继加入队列,a->next=b,b->next=c。且B和C现在没有获取锁,处于等待状态,所以它们的locked域为true,
尾指针指向线程C对应的结点
d. 线程A释放锁后,顺着它的next指针找到了线程B,并把B的locked域设置为false。这一动作会触发线程B获取锁
MCS Lock
public class MCSLock {
/**
* 队列初始化时没有结点,tail指向null
*
*/
private final AtomicReference<MCSNode> tail = new AtomicReference<>(null);
private ThreadLocal<MCSNode> current;
public MCSLock() {
current = ThreadLocal.withInitial(MCSNode::new);
}
public void lock(){
// 线程A想要获取锁,于是将自己置于队尾,由于它是第一个结点,它的locked域为false
MCSNode own = current.get();
MCSNode preNode = tail.getAndSet(own);
// 线程B和C相继加入队列,a->next=b,b->next=c。
// 且B和C现在没有获取锁,处于等待状态,所以它们的locked域为true,尾指针指向线程C对应的结点
if(preNode != null){
own.locked = true;
preNode.next = own;
// 在自己的结点的locked域上自旋等待
while (own.locked){}
}
}
public void unlock(){
MCSNode own = current.get();
if(!own.locked){
return;
}
// 最后一个获取锁的线程
if(own.next == null){
if(tail.compareAndSet(own, null)){
return;
}
// 过程中又有线程获得锁
while (own.next == null){}
}
// 线程A释放锁后,顺着它的next指针找到了线程B,并把B的locked域设置为false.这一动作会触发线程B获取锁
own.next.locked = false;
own.next = null;
}
private static class MCSNode{
private MCSNode next;
private volatile boolean locked = false;
}
}
2. Unsafe和LockSupport
2.1 Unsafe类简介
首先,我们的代码中不应该使用这个类,虽然可以通过反射的方式获取到它的示例并使用
个人觉得,我们也不需要详细了解这个类的使用
简单了解:
- put,get方法
// l offset变量相对偏移量,可用objectFieldOffset(java.lang.reflect.Field field)获取
putObject(java.lang.Object o, long l, java.lang.Object o1)
getObject(java.lang.Object o, long l) - 变量偏移量
objectFieldOffset(java.lang.reflect.Field field)
staticFieldOffset(java.lang.reflect.Field field) - 内存管理
allocateMemory(long l)
reallocateMemory(long l, long l1)
setMemory(java.lang.Object o, long l, long l1, byte b)
copyMemory(java.lang.Object o, long l, java.lang.Object o1, long l1, long l2)
freeMemory(long l) - CAS操作
compareAndSwapObject(java.lang.Object o, long l, java.lang.Object o1, java.lang.Object o2) - 实例化
allocateInstance(java.lang.Class<?> aClass) - 数组
arrayIndexScale(java.lang.Class<?> aClass)
arrayBaseOffset(java.lang.Class<?> aClass) - 阻塞和唤醒
park(boolean b, long l)
unpark(java.lang.Object o)
2.2 LockSupport
- 唤醒线程或者归还令牌
unpark(Thread thread) - 阻塞线程,附加额外信息
park(Object blocker) - 阻塞线程一段时间,附加额外信息
parkNanos(Object blocker, long nanos) - 阻塞线程直到某时间,附加额外信息
parkUntil(Object blocker, long deadline) - 阻塞线程
park() - 阻塞线程一段时间
阻塞线程一段时间 - 阻塞线程直到某时间
parkUntil(long deadline) - Thread添加额外信息
setBlocker(Thread t, Object arg)
getBlocker(Thread t)
这里获取的许可永远只有一个,与底层c++实现有关。HotSpot里Parker有一个私有_counter变量,无论调用多少次unpark,_counter都被设为1
3. AbstractQueuedSynchronizer
AQS改进了CLH队列自旋锁,结合了自旋和睡眠/唤醒两种方法的优点
3.1 双端链表Node
static final class Node {
/** 标记当前结点是共享模式 */
static final Node SHARED = new Node();
/** 标记当前结点是独占模式 */
static final Node EXCLUSIVE = null;
/**
* 结点的等待状态 可能的取值包含:CANCELLED、SIGNAL、CONDITION、PROPAGATE和0
* 在同步等待队列中的节点初始值为0,在条件等待队列中的节点初始值为CONDITION
* 在独占模式下,取值为CANCELLED、SIGNAL、0中之一
* 在共享模式下,取值为CANCELLED、SIGNAL、PROPAGATE和0中之一
* 在条件等待队列中,取值为CANCELLED、CONDITION中之一
*/
volatile int waitStatus;
/** 拥有当前结点的线程 */
volatile Thread thread;
/** 线程已经被取消 */
static final int CANCELLED = 1;
/**
* 后续节点需要唤醒
* 节点插入队列时,节点代表的线程睡眠前会将前一个节点的waitStatus置为SIGNAL
* 当前一个节点释放锁时,如果其waitStatus置为SIGNAL,则会唤醒其后下一个节点线程
*/
static final int SIGNAL = -1;
/** 表示节点代表的线程正处于条件等待队列中等待signal信号 */
static final int CONDITION = -2;
/** 在共享模式下使用,表示同步状态能够无条件向后传播 */
static final int PROPAGATE = -3;
volatile Node prev;
volatile Node next;
/**
* 在条件等待队列中,用于指向下一个节点
* 在同步等待队列中,用于标记该节点所代表的线程在独占模式下还是共享模式下获取锁
*/
Node nextWaiter;
public Node() {
}
/**
*
* @param thread 一般为当前线程
* @param mode 排他EXCLUSIVE 共享SHARED
*/
public Node(Thread thread, Node mode) {
this.thread = thread;
this.nextWaiter = mode;
}
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null) {
throw new NullPointerException();
} else {
return p;
}
}
}
3.2 同步等待队列
waitStatus在同步等待队列中不同的模式下,有不同的取值:
- 在独占模式下,取值为CANCELLED、SIGNAL、0中之一
- 在共享模式下,取值为CANCELLED、SIGNAL、PROPAGATE和0中之一
nextWaiter
- 在同步等待队列中,用于标记该节点所代表的线程在独占模式下还是共享模式下获取锁
3.3 独占模式下锁获取
这里我们只细讲acquire方法
public final void acquire(int arg) {
if (!tryAcquire(arg) && //1
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //2 -> 3
selfInterrupt(); //4
}
- 首先调用tryAcquire函数,线程尝试获取锁,如果获取成功则直接返回
- 如果获取失败,则调用addWaiter函数,将线程封装成一个Node节点并插入同步等待队列尾部;Node.EXCLUSIVE代表独占模式
/**
* 此处在尾部插入node时,先设置node的prev,再CAS修改队列tail指向,修改成功再设置前一个节点的next域
* 队列中,如果某个node的prev!=null,并不一定表示node已经成功插入队列中,如果某个node的前一个节点的next!=null,则该node一定位于队列中
* @param mode 模式
* @return
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
// 双端链表的add操作
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//头尾都为null 初始化以及enq
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// tail为空,初始化head和tail节点 head lazy initialize
if (compareAndSetHead(new Node())) {
tail = head;
}
} else {
// 双端链表的基本操作
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
- 接着调用acquireQueued函数,将前驱节点的waitStatus标记为SIGNAL后睡眠,等待前驱节点释放锁后唤醒,被唤醒后则继续尝试获取锁
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 自旋 当且仅当前驱节点是head,尝试获取锁
// 当前驱节点等于head时,说明前驱线程为当前正拥有锁,或者刚刚释放锁并且唤醒了当前节点
if (p == head && tryAcquire(arg)) {
// 设置成功获取锁的node节点为队列头head
setHead(node);
// help GC
p.next = null;
failed = false;
return interrupted;
}
// 如果前驱节点不是队列head或者获取锁失败,则设置前驱节点waitStatus为SIGNAL,并睡眠
// shouldParkAfterFailedAcquire 前置节点waitStatus设为-1 返回false
// parkAndCheckInterrupt 阻塞当前线程 返回currentThread().isInterrupted(true)
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
interrupted = true;
}
}
} finally {
if (failed) {
cancelAcquire(node);
}
}
}
protected boolean parkAndCheckInterrupt(){
LockSupport.park(this);
return Thread.interrupted();
}
private boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if(ws == Node.SIGNAL){
return true;
}
// ws = 1 CANCELLED 跳过取消的节点
if(ws > 0){
do {
node.prev = pred = pred.prev;
}while (pred.waitStatus > 0);
pred.next = node;
}else {
// 0 或者 PROPAGATE
compareAndSetWaitStatus(pred, ws, Node.CANCELLED);
}
return false;
}
/**
* 线程在获取锁过程中,可能因为出错、被中断或超时而取消获取锁
* @param node
*/
protected void cancelAcquire(Node node){
if (node == null) {
return;
}
//线程引用清空
node.thread = null;
Node pred = node.prev;
// 若前驱节点是CANCELLED,则跳过继续往前找
while (pred.waitStatus > 0) {
node.prev = pred = pred.prev;
}
// 前驱节点中不是CANCELLED节点,获取pre next指向
Node predNext = pred.next;
// 设置waitStatus值为CANCELLED,标记节点已取消
node.waitStatus = Node.CANCELLED;
// 如果被取消的node是尾部节点,则设置tail指针指向前驱节点,并且设置前驱节点的next指针为null
if(node == tail && compareAndSetTail(node, pred)){
compareAndSetNext(pred, predNext, null);
}else {
int ws;
if(pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred,ws,
Node.SIGNAL))) && pred.thread != null){
Node next = node.next;
if(next != null && next.waitStatus <= 0){
compareAndSetNext(pred, predNext, next);
}
}else {
unparkSuccessor(node);
}
node.next = node;
}
}
/**
* 唤醒后继节点
* @param node
*/
private void unparkSuccessor(Node node) {
// 在独占模式下,waitStatus<0此时为SIGNAL,将SIGNAL标志清除
int ws = node.waitStatus;
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
Node s = node.next;
// node.next == null || node.next.waitStatus > 0,表示该node.next线程取消了获取锁,则从队列尾部遍历,找到队列中第一个没有取消的线程
if(s == null || s.waitStatus > 0){
s = null;
for(Node t = tail;t != null && t != node; t = t.prev){
if(t.waitStatus <= 0){
s = t;
}
}
}
if(s != null){
LockSupport.unpark(s.thread);
}
}
- 如果线程睡眠过程中产生中断,则调用selfInterrupt函数让线程自我中断一下,设置中断标志,将中断传递给外层
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
3.4 独占模式释放锁
public final boolean release(int arg) {
if(tryRelease(arg)){
// 每个队列中的线程在获取到锁以后,会将队列中包含该线程的Node节点设置为head,所以head指向的Node即为当前占有锁的线程,也就是当前正在进行释放锁操作的线程
Node h = head;
if(h != null && h.waitStatus != 0){
// 唤醒后续的结点
unparkSuccessor(h);
}
return true;
}
return false;
}
/**
* 唤醒后继节点
* @param node
*/
private void unparkSuccessor(Node node) {
// 在独占模式下,waitStatus<0此时为SIGNAL,将SIGNAL标志清除
int ws = node.waitStatus;
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
Node s = node.next;
// node.next == null || node.next.waitStatus > 0,表示该node.next线程取消了获取锁,则从队列尾部遍历,找到队列中第一个没有取消的线程
if(s == null || s.waitStatus > 0){
s = null;
for(Node t = tail;t != null && t != node; t = t.prev){
if(t.waitStatus <= 0){
s = t;
}
}
}
if(s != null){
LockSupport.unpark(s.thread);
}
}
3.5 共享模式获取锁
共享模式获取锁与独占模式获取锁,仅有一点区别,就是setHead的同时会唤醒下一个shared节点
private void doAcquireShared(int arg){
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for(;;){
final Node p = node.predecessor();
if(p == head){
int r = tryAcquireShared(arg);
if(r >= 0){
// 与独占模式下获取锁唯一的区别就在下面这个setHeadAndPropagate函数
setHeadAndPropagate(node, r);
// help GC
p.next = null;
if (interrupted) {
selfInterrupt();
}
failed = false;
return;
}
}
if(shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()){
interrupted = true;
}
}
} finally {
if(failed){
cancelAcquire(node);
}
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// 设置成功获取锁的node节点为队列头head
setHead(node);
/**
* propagate > 0 说明许可还能够继续被线程acquire
* 之前的head == null 未知状态
* 之前的head被设置为PROPAGAT 说明需要往后传递
* 当前head = null 未知状态
* 当前head被设置为PROPAGAT
*/
if(propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0){
Node s = node.next;
if(s == null || s.isShared()){
doReleaseShared();
}
}
}
private void doReleaseShared() {
for(;;){
Node h = head;
if(h != null && h != tail){
int ws = h.waitStatus;
// SIGNAL状态直接唤醒下一个节点
if(ws == Node.SIGNAL){
if(!compareAndSetWaitStatus(h, Node.SIGNAL, 0)){
continue;
}
unparkSuccessor(h);
}else if(ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)){
//首节点的状态是0,则通过CAS设置为PROPAGATE,表示如果连接在当前节点后的node的状态如果是shared,则无条件获取锁
continue;
}
}
if(h == head){
break;
}
}
}
3.6 共享模式释放锁
public final boolean releaseShared(int arg){
if(tryReleaseShared(arg)){
doReleaseShared();
return true;
}
return false;
}
4. Condition的实现原理
AQS通过一个FIFO单向链表来实现条件等待队列
Condition接口定义了一个类似于Object对象的监视器的接口方法。await()、 awaitNanos(long nanosTimeout)、signal()、signalAll()可以对应上Object对象的wait()、wait(long timeout)、notify()、notifyAll();而且Condition的控制更加灵活精确
4.1 await()
- 新建Condition Node包装线程,加入Condition队列
- 释放线程占有的锁
- 在同步等待队列中则睡眠等待 直到被唤醒并加入到同步队列中
- 尝试获取锁 此时节点已经加入到同步队列中
- 处理cancled节点
- 处理异常
/**
* 1. 新建Condition Node包装线程,加入Condition队列
* 2. 释放线程占有的锁
* 3. 在同步等待队列中则睡眠等待 直到被唤醒并加入到同步队列中
* 4. 尝试获取锁 此时节点已经加入到同步队列中
* 5. 处理cancled节点
* 6. 处理异常
* @throws InterruptedException
*/
@Override
public final void await() throws InterruptedException {
if(Thread.interrupted()){
throw new InterruptedException();
}
// 新建Condition Node包装线程,加入Condition队列
Node node = addConditionWaiter();
// 释放线程占有的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 不在同步等待队列中则睡眠等待 直到被唤醒并加入到同步队列中
while (!isOnSyncQueue(node)){
LockSupport.park(this);
// 检查是否被打断
if((interruptMode = checkInterruptWhileWaiting(node)) != 0){
break;
}
}
// 尝试获取锁 异常不抛出
if(acquireQueued(node, savedState) && interruptMode != THROW_IE){
interruptMode = REINTERRUPT;
}
if(node.nextWaiter != null){
unlinkCancelledWaiters();
}
if(interruptMode != 0){
reportInterruptAfterWait(interruptMode);
}
}
/**
* 单向链表add 返回lastWaiter
* @return
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// lastWaiter CANCELLED 移除
if(t != null && t.waitStatus != Node.CONDITION){
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if(t == null){
firstWaiter = node;
}else {
t.nextWaiter = node;
}
lastWaiter = node;
return node;
}
/**
* 没有打断 0
* interrupted before signalled -1
* interrupted after signalled 1
* @param node
* @return
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}
isOnSyncQueue是AQS的方法,功能是判断节点是否在同步队列中
/**
* 节点是否在同步队列中
* @param node
* @return
*/
private boolean isOnSyncQueue(Node node) {
// 在条件队列中
// 或者同时别的线程调用signal,Node会从Condition队列中移除,从移除到进入release队列,中间这段时间prev必然为null
if(node.waitStatus == Node.CONDITION || node.prev == null){
return false;
}
if(node.next != null){
return true;
}
// 可能该Node刚刚最后一个进入release队列,所以是tail,其next必然是null,所以需要从队尾向前查找
return findNodeFromTail(node);
}
transferAfterCancelledWait唤醒前或后被打断
/**
* 唤醒前或者后被打断
* @param node
* @return
*/
private boolean transferAfterCancelledWait(Node node) {
// 唤醒前线程被打断(超时等原因将不再等待) 直接加入到同步队列等待获取锁
if(compareAndSetWaitStatus(node, Node.CONDITION, 0)){
enq(node);
return true;
}
// 唤醒的时候被打断 等待加入队列(可以获取锁的状态)结束
while (!isOnSyncQueue(node)){
Thread.yield();
}
return false;
}
4.2 signal()
- 条件队列中移除first节点
- 跳过cancled节点,将first加入到同步队列直到加入成功
这里并没有唤醒该线程,而是在await()的第4步才可能唤醒该线程
/**
* 1. 条件队列中移除first节点
* 2. 跳过cancled节点,找到一个没有取消的first放入release队列
* @param first
*/
private void doSignal(Node first) {
do {
if((firstWaiter = first.nextWaiter) == null){
lastWaiter = null;
}
first.nextWaiter = null;
}while (!transferForSignal(first) && (first = firstWaiter) != null);
}
transferForSignal是AQS的方法
/**
* condition队列中节点加入到同步队列中
* @param first
* @return
*/
private boolean transferForSignal(Node first) {
// 无法cas waitStatus, 节点被取消
if(!compareAndSetWaitStatus(first, Node.CONDITION, 0)){
return false;
}
// p是该Node的前驱
Node p = enq(first);
int ws = p.waitStatus;
// 前驱节点cancled或者变更waitStatus失败,直接唤醒当前线程
if(ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)){
LockSupport.unpark(first.thread);
}
return true;
}
5. 简单注释源码
public abstract class MyAbstractQueuedSynchronizer {
/**
* <pre>
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* | | ----> | | ----> | | tail
* +------+ +-----+ +-----+
* </pre>
*/
/** 双端队列的头,只有setHead可以修改 */
private transient volatile Node head;
/** 端队列的尾,只有enq可以修改 */
private transient volatile Node tail;
/** 同步状态 */
private volatile int state;
protected final int getState() {
return state;
}
/** 节点自旋还是阻塞的超时时间 */
static final long spinForTimeoutThreshold = 1000L;
protected MyAbstractQueuedSynchronizer() {
}
protected boolean tryAcquire(int arg){
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg){
throw new UnsupportedOperationException();
}
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
/**
* 独占模式下线程获取锁的方法,该函数忽略中断,即线程在aquire过程中,中断此线程是无效的
* @param arg
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
selfInterrupt();
}
}
/**
* 释放锁
* @param arg
* @return
*/
public final boolean release(int arg) {
if(tryRelease(arg)){
// 每个队列中的线程在获取到锁以后,会将队列中包含该线程的Node节点设置为head,所以head指向的Node即为当前占有锁的线程,也就是当前正在进行释放锁操作的线程
Node h = head;
if(h != null && h.waitStatus != 0){
// 唤醒后续的结点
unparkSuccessor(h);
}
return true;
}
return false;
}
/**
* doAcquireShared与acquireQueued仅有一点区别
* @param arg
*/
private void doAcquireShared(int arg){
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for(;;){
final Node p = node.predecessor();
if(p == head){
int r = tryAcquireShared(arg);
if(r >= 0){
// 与独占模式下获取锁唯一的区别就在下面这个setHeadAndPropagate函数
setHeadAndPropagate(node, r);
// help GC
p.next = null;
if (interrupted) {
selfInterrupt();
}
failed = false;
return;
}
}
if(shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()){
interrupted = true;
}
}
} finally {
if(failed){
cancelAcquire(node);
}
}
}
public final boolean releaseShared(int arg){
if(tryReleaseShared(arg)){
doReleaseShared();
return true;
}
return false;
}
/**
*
* @param node
* @param propagate
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// 设置成功获取锁的node节点为队列头head
setHead(node);
/**
* propagate > 0 说明许可还能够继续被线程acquire
* 之前的head == null 未知状态
* 之前的head被设置为PROPAGAT 说明需要往后传递
* 当前head = null 未知状态
* 当前head被设置为PROPAGAT
*/
if(propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0){
Node s = node.next;
if(s == null || s.isShared()){
doReleaseShared();
}
}
}
private void doReleaseShared() {
for(;;){
Node h = head;
if(h != null && h != tail){
int ws = h.waitStatus;
// SIGNAL状态直接唤醒下一个节点
if(ws == Node.SIGNAL){
if(!compareAndSetWaitStatus(h, Node.SIGNAL, 0)){
continue;
}
unparkSuccessor(h);
}else if(ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)){
//首节点的状态是0,则通过CAS设置为PROPAGATE,表示如果连接在当前节点后的node的状态如果是shared,则无条件获取锁
continue;
}
}
if(h == head){
break;
}
}
}
/**
* 子类实现
* @param arg
* @return
*/
protected int tryAcquireShared(int arg){
throw new UnsupportedOperationException();
}
private boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 自旋 当且仅当前驱节点是head,尝试获取锁
// 当前驱节点等于head时,说明前驱线程为当前正拥有锁,或者刚刚释放锁并且唤醒了当前节点
if (p == head && tryAcquire(arg)) {
// 设置成功获取锁的node节点为队列头head
setHead(node);
// help GC
p.next = null;
failed = false;
return interrupted;
}
// 如果前驱节点不是队列head或者获取锁失败,则设置前驱节点waitStatus为SIGNAL,并睡眠
// shouldParkAfterFailedAcquire 前置节点waitStatus设为-1 返回false
// parkAndCheckInterrupt 阻塞当前线程 返回currentThread().isInterrupted(true)
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
interrupted = true;
}
}
} finally {
if (failed) {
cancelAcquire(node);
}
}
}
/**
* 此处在尾部插入node时,先设置node的prev,再CAS修改队列tail指向,修改成功再设置前一个节点的next域
* 队列中,如果某个node的prev!=null,并不一定表示node已经成功插入队列中,如果某个node的前一个节点的next!=null,则该node一定位于队列中
* @param mode 模式
* @return
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
// 双端链表的add操作
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//头尾都为null 初始化以及enq
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// tail为空,初始化head和tail节点 head lazy initialize
if (compareAndSetHead(new Node())) {
tail = head;
}
} else {
// 双端链表的基本操作
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* 线程在获取锁过程中,可能因为出错、被中断或超时而取消获取锁
* @param node
*/
protected void cancelAcquire(Node node){
if (node == null) {
return;
}
//线程引用清空
node.thread = null;
Node pred = node.prev;
// 若前驱节点是CANCELLED,则跳过继续往前找
while (pred.waitStatus > 0) {
node.prev = pred = pred.prev;
}
// 前驱节点中不是CANCELLED节点,获取pre next指向
Node predNext = pred.next;
// 设置waitStatus值为CANCELLED,标记节点已取消
node.waitStatus = Node.CANCELLED;
// 如果被取消的node是尾部节点,则设置tail指针指向前驱节点,并且设置前驱节点的next指针为null
if(node == tail && compareAndSetTail(node, pred)){
compareAndSetNext(pred, predNext, null);
}else {
int ws;
if(pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred,ws,
Node.SIGNAL))) && pred.thread != null){
Node next = node.next;
if(next != null && next.waitStatus <= 0){
compareAndSetNext(pred, predNext, next);
}
}else {
unparkSuccessor(node);
}
node.next = node;
}
}
/**
* 唤醒后继节点
* @param node
*/
private void unparkSuccessor(Node node) {
// 在独占模式下,waitStatus<0此时为SIGNAL,将SIGNAL标志清除
int ws = node.waitStatus;
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
Node s = node.next;
// node.next == null || node.next.waitStatus > 0,表示该node.next线程取消了获取锁,则从队列尾部遍历,找到队列中第一个没有取消的线程
if(s == null || s.waitStatus > 0){
s = null;
for(Node t = tail;t != null && t != node; t = t.prev){
if(t.waitStatus <= 0){
s = t;
}
}
}
if(s != null){
LockSupport.unpark(s.thread);
}
}
protected boolean parkAndCheckInterrupt(){
LockSupport.park(this);
return Thread.interrupted();
}
private boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if(ws == Node.SIGNAL){
return true;
}
// ws = 1 CANCELLED 跳过取消的节点
if(ws > 0){
do {
node.prev = pred = pred.prev;
}while (pred.waitStatus > 0);
pred.next = node;
}else {
// 0 或者 PROPAGATE
compareAndSetWaitStatus(pred, ws, Node.CANCELLED);
}
return false;
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
private int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if(failed){
node.waitStatus = Node.CANCELLED;
}
}
}
/**
* 节点是否在同步队列中
* @param node
* @return
*/
private boolean isOnSyncQueue(Node node) {
// 在条件队列中
// 或者同时别的线程调用signal,Node会从Condition队列中移除,从移除到进入release队列,中间这段时间prev必然为null
if(node.waitStatus == Node.CONDITION || node.prev == null){
return false;
}
if(node.next != null){
return true;
}
// 可能该Node刚刚最后一个进入release队列,所以是tail,其next必然是null,所以需要从队尾向前查找
return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node) {
return true;
}
if (t == null) {
return false;
}
t = t.prev;
}
}
/**
* 唤醒前或者后被打断
* @param node
* @return
*/
private boolean transferAfterCancelledWait(Node node) {
// 唤醒前线程被打断(超时等原因将不再等待) 直接加入到同步队列等待获取锁
if(compareAndSetWaitStatus(node, Node.CONDITION, 0)){
enq(node);
return true;
}
// 唤醒的时候被打断 等待加入队列(可以获取锁的状态)结束
while (!isOnSyncQueue(node)){
Thread.yield();
}
return false;
}
/**
* condition队列中节点加入到同步队列中
* @param first
* @return
*/
private boolean transferForSignal(Node first) {
// 无法cas waitStatus, 节点被取消
if(!compareAndSetWaitStatus(first, Node.CONDITION, 0)){
return false;
}
// p是该Node的前驱
Node p = enq(first);
int ws = p.waitStatus;
// 前驱节点cancled或者变更waitStatus失败,直接唤醒当前线程
if(ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)){
LockSupport.unpark(first.thread);
}
return true;
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
/** 原子性交换state */
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
/** 原子性交换head 仅enq调用 */
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/** 原子性交换tail 仅enq调用 */
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}
private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
static final class Node {
/** 标记当前结点是共享模式 */
static final Node SHARED = new Node();
/** 标记当前结点是独占模式 */
static final Node EXCLUSIVE = null;
/**
* 结点的等待状态 可能的取值包含:CANCELLED、SIGNAL、CONDITION、PROPAGATE和0
* 在同步等待队列中的节点初始值为0,在条件等待队列中的节点初始值为CONDITION
* 在独占模式下,取值为CANCELLED、SIGNAL、0中之一
* 在共享模式下,取值为CANCELLED、SIGNAL、PROPAGATE和0中之一
* 在条件等待队列中,取值为CANCELLED、CONDITION中之一
*/
volatile int waitStatus;
/** 拥有当前结点的线程 */
volatile Thread thread;
/** 线程已经被取消 */
static final int CANCELLED = 1;
/**
* 后续节点需要唤醒
* 节点插入队列时,节点代表的线程睡眠前会将前一个节点的waitStatus置为SIGNAL
* 当前一个节点释放锁时,如果其waitStatus置为SIGNAL,则会唤醒其后下一个节点线程
*/
static final int SIGNAL = -1;
/** 表示节点代表的线程正处于条件等待队列中等待signal信号 */
static final int CONDITION = -2;
/** 在共享模式下使用,表示同步状态能够无条件向后传播 */
static final int PROPAGATE = -3;
volatile Node prev;
volatile Node next;
/**
* 在条件等待队列中,用于指向下一个节点
* 在同步等待队列中,用于标记该节点所代表的线程在独占模式下还是共享模式下获取锁
*/
Node nextWaiter;
public Node() {
}
/**
*
* @param thread 一般为当前线程
* @param mode 排他EXCLUSIVE 共享SHARED
*/
public Node(Thread thread, Node mode) {
this.thread = thread;
this.nextWaiter = mode;
}
public Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null) {
throw new NullPointerException();
} else {
return p;
}
}
}
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(MyAbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(MyAbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(MyAbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
public class ConditionObject implements Condition, Serializable{
private transient Node firstWaiter;
private transient Node lastWaiter;
private static final int REINTERRUPT = 1;
private static final int THROW_IE = -1;
public ConditionObject() {}
/**
* 1. 新建Condition Node包装线程,加入Condition队列
* 2. 释放线程占有的锁
* 3. 在同步等待队列中则睡眠等待 直到被唤醒并加入到同步队列中
* 4. 尝试获取锁 此时节点已经加入到同步队列中
* 5. 处理cancled节点
* 6. 处理异常
* @throws InterruptedException
*/
@Override
public final void await() throws InterruptedException {
if(Thread.interrupted()){
throw new InterruptedException();
}
// 新建Condition Node包装线程,加入Condition队列
Node node = addConditionWaiter();
// 释放线程占有的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 不在同步等待队列中则睡眠等待 直到被唤醒并加入到同步队列中
while (!isOnSyncQueue(node)){
LockSupport.park(this);
// 检查是否被打断
if((interruptMode = checkInterruptWhileWaiting(node)) != 0){
break;
}
}
// 尝试获取锁 异常不抛出
if(acquireQueued(node, savedState) && interruptMode != THROW_IE){
interruptMode = REINTERRUPT;
}
if(node.nextWaiter != null){
unlinkCancelledWaiters();
}
if(interruptMode != 0){
reportInterruptAfterWait(interruptMode);
}
}
@Override
public void awaitUninterruptibly() {
}
@Override
public long awaitNanos(long nanosTimeout) throws InterruptedException {
return 0;
}
@Override
public boolean await(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public boolean awaitUntil(Date deadline) throws InterruptedException {
return false;
}
@Override
public void signal() {
if(!isHeldExclusively()){
throw new IllegalMonitorStateException();
}
Node first = firstWaiter;
if(first != null){
doSignal(first);
}
}
@Override
public void signalAll() {
}
/**
* 1. 条件队列中移除first节点
* 2. 跳过cancled节点,找到一个没有取消的first放入release队列
* @param first
*/
private void doSignal(Node first) {
do {
if((firstWaiter = first.nextWaiter) == null){
lastWaiter = null;
}
first.nextWaiter = null;
}while (!transferForSignal(first) && (first = firstWaiter) != null);
}
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
if(interruptMode == THROW_IE){
throw new InterruptedException();
}else if (interruptMode == REINTERRUPT) {
selfInterrupt();
}
}
/**
* 没有打断 0
* interrupted before signalled -1
* interrupted after signalled 1
* @param node
* @return
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}
/**
* 单向链表add 返回lastWaiter
* @return
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// lastWaiter CANCELLED 移除
if(t != null && t.waitStatus != Node.CONDITION){
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if(t == null){
firstWaiter = node;
}else {
t.nextWaiter = node;
}
lastWaiter = node;
return node;
}
/**
* 单向链表去掉某个节点的过程
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null){
Node next = t.nextWaiter;
if(t.waitStatus != Node.CONDITION){
t.nextWaiter = null;
if(trail == null){
firstWaiter = next;
}else {
trail.nextWaiter = next;
}
}else {
trail = t;
}
t = next;
}
}
}
}