AbstractQueuedSynchronizer源码

不多BB,开门见山,我是参考下面的博文的,不是抄袭,读书人叫参考
参考博文:https://www.cnblogs.com/waterystone/p/4920797.html
AQS的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,并可以实现独占锁和共享锁
AQS使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。它提供了三个方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))来对同步状态state进行操作,当然AQS可以确保对state的操作是安全的。

  • getState():获取当前同步状态
  • setState():设置当前同步状态
  • compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性
    AQS通过内置的FIFO同步队列来完成资源获取线程的排队工作,如果当前线程获取同步状态失败(锁)时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态

这个是AQS的内部类,其中的成员变量代表的含义:

  • exclusive:独占式,保证一次只有一个线程可以经过阻塞点,只有一个线程可以获取到锁。

  • shared:共享式,可以允许多个线程阻塞点,可以多个线程同时获取到锁。

  • CANCELLED(1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。

  • SIGNAL(-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。

  • CONDITION(-2):表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。

  • PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。

  • 0:新结点入队时的默认状态。

  • prev:前继节点;

  • next:后继节点;

  • nextWaiter:存储condition队列中的后继节点;

  • thread:当前线程。

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。

  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false,需要子类实现。

  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false,需要子类实现。

  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源,需要子类实现。

  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false,需要子类实现。

static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;
        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
        Node nextWaiter;
//------省略-------

独占锁

原理:在 AQS 内部,通过维护一个FIFO 队列来管理多线程的排队工作。在公平竞争的情况下,无法获取同步状态的线程将会被封装成一个节点,置于队列尾部。入队的线程将会通过自旋的方式获取同步状态,若在有限次的尝试后,仍未获取成功,线程则会被阻塞住。
当头结点释放同步状态后,且后继节点对应的线程被阻塞,此时头结点线程将会去唤醒后继节点线程。后继节点线程恢复运行并获取同步状态后,会将旧的头结点从队列中移除,并将自己设为头结点。

这是获取锁的方法,先通过tryAcquire获取锁,这个tryAcquire如何获取锁是需要子类去实现的,如果获取失败就调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg),

  public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

那么我先看addWaiter(Node.EXCLUSIVE),addWaiter方法主要是在队列中添加一个节点,并且返回新添加的节点
将当前线程的线程消息和独占式节点构造一个新的节点,将新的节点使用CAS添加到队列尾部。
如果更新失败(存在并发竞争更新),则进入enq方法进行添加

 private Node addWaiter(Node mode) {
//将当前线程的线程消息和独占式节点构造一个新的节点
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {//判断尾节点是否为空,即判断同步队列是否含有元素
            node.prev = pred;
 //如果同步队列不为空,就通过CAS将新的节点设置为尾节点
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
//若同步队列为空,或者是竞争太激烈添加节点失败时就进入enq方法进行空队列的元素的添加,主要是通过不断的循环添加节点实现
        enq(node);
        return node;
    }

若同步队列为空,则进入enq方法进行空队列的元素的添加,或者是队列添加遇到竞争添加失败通过这个方法的死循环不断重试往队列利用cas将节点添加到结尾为止,知道是这么回事就行,不用太过纠结这个方法

//将节点插入队列,必要时进行初始化
//通过 循环+CAS 在队列中成功插入一个节点后返回
//此方法是在队列为空或者是竞争太激烈调用addWaiter方法时添加节点失败才进行操作的
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        //判断尾节点是否为空,即判断队列中是否有元素
        if (t == null) { 
            //如果没有元素,就将一个节点添加到队列中
            // 然后将这个新加入的节点设置为头节点和尾节点,并且进行第二次循环,
          //注意!!!!这个头结点只是一个dummy ,只有用于方便遍历索引的牵头节点而已
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            //将队列的尾节点tail节点设置为添加节点的前节点
            node.prev = t;
            //通过CAS将需要添加到队列的节点设置为尾节点,这个方法是遇到竞争时使用,可以通过循环不断的添加,直至成功
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

acquireQueued()方法主要作用是不断尝试拿锁。拿到锁后,会清空条件队列中被取消的节点,如果线程的前缀节点不是头结点则将线程阻塞。在线程被唤醒后,如果中间没有遇到中断(interrupt()方法)就会返回false,如果线程在阻塞状态被调用了interrupt()方法就会返回true,然后执行selfInterrupt()方法

public void interrupt(): 设置当前线程的中断标志位,非WAITING线程不会受任何影响的,仅此而已。
public boolean isInterrupted(): 判断当前线程中断标志位是否被标记为中断,不会清除标志位
public static boolean interrupted():这是一个静态方法,返回当前线程是否标记中断,同时清除标志位

向队列添加完节点后,调用acquireQueued:

  • 1、获取当前节点的前驱节点p
  • 2、判断当前p节点是否是头结点,如果p是头节点才能尝试获得锁
  • 3、p是头节点并且获取锁成功则把当前节点设置为头结点,把之前的头结点从队列中移除,等待垃圾回收(没有对象引用)
  • 4、p不是头节点或者获取锁失败就调用shouldParkAfterFailedAcquire方法检测当前节点是否可以被挂起(p节点是SIGNAL才会返回true并挂起,如果不是SIGNAL和取消状态,就通过cas将p修改为SIGNAL,如果cas失败了就直接返回false通过外层循环在就行上述一系列操作),并且把取消状态的线程节点剔除,如果存在其他的状态就把它改成可以安全挂起的状态进行后续的挂起操作

为什么前序节点是头节点head 才能去获取锁呢,这里理由有两个:

  • 1、初始化队列时head 只是用于方便遍历的牵引节点,
  • 2、后续的head 节点代表已经获取同步状态的,轮到后续的节点去获取
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //获取当前节点的前继节点
            final Node p = node.predecessor();
            //判断当前节点的前驱节点是否为头节点,
          //为什么前序节点是头节点head 才能去获取锁呢,这里理由有两个:
//1、初始化队列时head 只是用于方便遍历的牵引节点,
//2、后续的head 节点代表已经获取同步状态的,轮到后续的节点去获取
            //如果是就尝试获取锁,前缀节点p不是头节点则跳过if,进行下面的阻塞
            if (p == head && tryAcquire(arg)) {
                //如果获取同步状态成功就将当前节点设置为head节点
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //如果p不是head或者未成功获取锁则根据前驱节点判断是否要阻塞。
            //如果阻塞过程中被中断,则置interrupted标志位为true。
            //shouldParkAfterFailedAcquire方法在前驱状态不为SIGNAL的情况下都会循环重试获取锁。
            //parkAndCheckInterrupt方法会阻塞线程
            if (shouldParkAfterFailedAcquire(p, node) &&
//通过LockSupport挂起线程,等待唤醒   
  parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        //如果锁获取失败则取消获取
        if (failed)
            cancelAcquire(node);
    }
}

这里主要做的是:

  • 1、将需要挂起的节点,也就是前缀节点为SIGNAL状态的节点,直接返回True,后面会把它挂起
  • 2、将取消的节点剔除队列,也就是waitStatus 大于0的节点
  • 3、其他状态的话就通过CAS将节点的waitStatus修改为SIGNAL
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
//如果节点的前一个节点的状态是SIGNAL,代表是需要挂起的状态,返回true去执行挂起
    if (ws == Node.SIGNAL)
        return true;
//如果ws > 0,表示前一个节点已被取消,删除状态是已取消的节点;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {   
//如果前面的两种情况没有,需要设置前继节点的状态为SIGNAL,以便下次循环可以安全挂起pred的后续节点。
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

线程通过parkAndCheckInterrupt进行挂起,主要是调用了LockSupport.park,它的作用就是挂起线程,内部是使用了native方法挂起,LockSupport.park的恢复的条件为
1:线程调用了unpark; 2:其它线程中断了线程,被interrupt()

此时线程就正在的被阻塞阻塞了,AQS正在阻塞线程的方法就是 LockSupport.park

 //如果线程找好安全休息点后,那就可以安心去休息了。此方法就是让线程去休息,真正进入等待状态。
  private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

LockSupport提供的是一个许可,如果存在许可,线程在调用park的时候,会立马返回,此时许可也会被消费掉,如果没有许可,则会阻塞。调用unpark的时候,如果许可本身不可用,则会使得许可可用

LockSupport类的关键方法unpark和park都是native方法,unpark为线程提供“许可(permit)”,线程调用park则等待“许可”。这个有点像信号量,但是这个“许可”是不能叠加的,“许可”是一次性的。

在Java5里是用wait/notify/notifyAll来同步的。wait/notify机制有个很蛋疼的地方是,比如线程B要用notify通知线程A,那么线程B要确保线程A已经在wait调用上等待了,否则线程A可能永远都在等待。编程的时候就会很蛋疼。

park/unpark模型真正解耦了线程之间的同步,线程之间不再需要一个Object或者其它变量来存储状态,不再需要关心对方的状态。

public class LockSupport {
//.......省略
  public static void park(Object blocker) {
//如果被唤醒,查看自己是不是被中断的。
        Thread t = Thread.currentThread();
//设置 t 为要阻塞线程
        setBlocker(t, blocker);
//将当前线程挂起,这是个native方法
        UNSAFE.park(false, 0L);
//将许可证置空
        setBlocker(t, null);
    }
private static void setBlocker(Thread t, Object arg) { 
        UNSAFE.putObject(t, parkBlockerOffset, arg);
    }
 public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }
.......//省略
}

将当前线程的中断标志位,置为true

//将当前线程的中断标志位,置为true。这里是补充一个中断位
static void selfInterrupt() {
//线程的中断标志位设置为true
    Thread.currentThread().interrupt();
}

这个就是AQS独占锁获取锁的实现,获取锁失败就将线程转化为排队进行阻塞

我们看看独占锁的解锁的实现
tryRelease(int arg) 方法应该由实现AQS的子类来实现具体的逻辑。

首先通过tryRelease方法释放锁如果释放锁成功,执行第2步。
通过调用unparkSuccessor() 方法来唤醒头结点的后继节点。该方法内部是通过LockSupport.unpark(s.thread);来唤醒后继节点的

 public final boolean release(int arg) {
        //尝试释放锁
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                //唤醒后继节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

LockSupport.unpark唤醒节点线程

 private void unparkSuccessor(Node node) {  
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
//获取头节点的next节点,就是需要唤醒的节点
        Node s = node.next;
//头结点的后续节点是空或者等待是取消状态
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
//LockSupport.unpark唤醒节点线程
            LockSupport.unpark(s.thread);
    }

共享锁

原理:共享类型的节点获取共享同步状态后,如果后继节点也是共享类型节点,当前节点则会唤醒后继节点。这样,多个节点线程即可同时获取共享同步状态。

接下来看看共享锁的实现:
使用tryAcquireShared试获取的锁,如果获取失败执行doAcquireShared方法。tryAcquireShared方法是需要子类去实现的,主要还是子类去定义线程在共享模式下如何去获取同步状态

public final void acquireShared(int arg) {
    //尝试获取的锁,如果获取失败执行doAcquireShared方法。
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

我们在看看doAcquireShared方法:

  • 1、先调用addWaiter(Node.SHARED)将线程设置为共享模式添加队列中
  • 2、获取当前节点node的前驱节点p
  • 3、判断当前p节点是否是头结点,如果p是头节点才能尝试获得锁
  • 4、p是头节点并且获取锁成功则调用setHeadAndPropagate方法,将当前节点设置为head节点,然后判断当前线程是否获取到了乐观锁,判断之前的头节点p的waitStatus是否是SIGNAL或者当前节点是否是SIGNAL等,然后判断当前节点的后续节点是否是乐观模式,是则调用doReleaseShared进行唤醒
  • 5、如果获取锁失败或者是p不是头节点则调用shouldParkAfterFailedAcquire和parkAndCheckInterrupt将线程挂起
private void doAcquireShared(int arg) {
    //将共享锁节点添加进同步队列中
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
//线程自旋
        for (;;) {
            //获取当前节点的前驱节点
            final Node p = node.predecessor();
            //如果当前节点的前驱节点是head节点
            if (p == head) {
                //尝试获取共享锁
                int r = tryAcquireShared(arg);
                //大于等于0代表获取锁成功
                //一旦共享获取成功,设置新的头结点,并且唤醒后继线程
                if (r >= 0) {
                    /**
                     * 这个方法主要是如果后继节点是共享类型,唤醒后继节点。主要做以下两件事:
                     * 1. 在获取共享锁成功后,设置自己为head节点 
                     * 2. 将后继节点唤醒,头节点的waitStatus设为propagate
*这个方法是共享锁的核心:通过将当前节点设置为头节点,后又可以通过外面代码的循环,不断的寻找共享锁的节点,并且不断的将其唤醒
                     */
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            //shouldParkAfterFailedAcquire根据前驱节点中的waitStatus来判断是否需要阻塞当前线程。
            //阻塞当前线程并且检查当前线程是否被中断
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            // 该方法实现某个node取消获取锁,取消正在进行的获取尝试
            cancelAcquire(node);
    }
}

我们在看看setHeadAndPropagate,将当前节点设置为head节点,然后判断当前线程是否获取到了共享锁也就是propagate变量大于0然后判断之前的头节点p的waitStatus是否是SIGNAL或者当前节点是否是SIGNAL等,然后判断当前节点的后续节点是否是共享模式,是则调用doReleaseShared进行唤醒,共享的就调用doReleaseShared(),doReleaseShared()是共享锁的唤醒方法,

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; 
//将当前节点设置为头节点,这是很关键的一步,将当前节点设置为头节点后又可以通过外面代码的循环,不断的寻找共享锁的节点,并且将其唤醒
        setHead(node);
  
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();//将后继节点唤醒,头节点的waitStatus设为propagate
        }
    }

doReleaseShared主要作用是将后继节点唤醒,头节点的waitStatus设为propagate。

在死循环内:

  • 1、获取head节点的waitStatus;

  • 2、如果head节点的waitStatus为SIGNAL,通过cas将head节点的waitStatus改为0,必须确保cas执行成功,否则通过新一轮循环,再次执行cas操作直至成功为止;(死循环与cas构成自旋锁保证cas执行成功)

  • 3、如果第二步执行成功,调用unparkSuccessor方法唤醒后继节点;

  • 4、第三步执行完成后,开始新一轮的循环,判断如果head节点的waitStatus为0,通过cas将head节点的waitStatus改为propagate,必须确保cas执行成功,否则通过新一轮循环,再次执行cas操作直至成功为止;(死循环与cas构成自旋锁保证cas执行成功)

由此可见,head节点的waitStatus为SIGNAL时,通过两步改为propagate:

compareAndSetWaitStatus(h, Node.SIGNAL, 0)

compareAndSetWaitStatus(h, 0, Node.PROPAGATE)

为什么要经过两步,不直接把SIGNAL改为propagate呢?原因在unparkSuccessor方法。如果直接把SIGNAL改为propagate,则在unparkSuccessor方法里又会被设置为0。

5、在第四步完成后,判断当前head节点是否发生改变,如果没有发生改变,break退出死循环。在第三步唤醒后继节点后,后继节点(所在的线程)会将自己设置为头节点,此时head节点就会发生改变,对新head节点继续执行循环,从而实现release propagate。新head节点也会调用到doReleaseShared方法,这样会存在多个线程同时调用doReleaseShared方法,执行死循环里的逻辑。

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            //如果节点的状态为SIGNAL
            if (ws == Node.SIGNAL) {
                //就将SIGNAL变为0,如果CAS失败,则进入unparkSuccessor,相当与释放锁
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                //唤醒node的后继节点(如果存在的话)。
                unparkSuccessor(h);
            }
            //将节点状态变为PROPAGATE
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;    
        }
        if (h == head)       
            break;
    }
}

unparkSuccessor主要是如果节点的waitStatus为负,通过cas设置为0也就是取消状态,后续会将其进行剔除;找到有效的后继节点,调用unpark方法

private void unparkSuccessor(Node node) {
   
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

   
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

因此acquireShared的主要逻辑就是尝试加锁,如果允许其他线程继续加锁,那么唤醒后继线程,如果失败,那么入队阻塞等待。

共享锁的释放:
内部就是调用的doReleaseShared()

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容