Java - ReentrantLock原理

1. 非公平锁实现原理

加锁解锁流程

先从构造器开始看,默认为非公平锁实现,NonfairSync 继承自 AQS

public ReentrantLock() {
    sync = new NonfairSync();
}

没有竞争时

第一个竞争出现时

Thread-1 执行了

  1. CAS 尝试将 state 由 0 改为 1,结果失败
  2. 进入 tryAcquire 逻辑,这是 state 已经是 1,结果仍然失败
  3. 接下来进入 addWaiter 逻辑,构造 Node 队列
    • 图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
    • Node 的创建是懒惰的
    • 其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程

当前线程进入 acquireQueued 逻辑

  1. acquireQueued 会在一个死循环中不断尝试获取锁,失败后进入 park 阻塞
  2. 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
  3. 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false
  1. shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued,再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
  2. 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回 true
  3. 进入 parkAndCheckInterrupt,Thread-1 park(灰色表示)

再次有多个线程经历上述过程竞争失败,变成这个样子

Thread-0 释放锁,进入 tryRelease 流程,如果成功

  • 设置 exclusiveOwnerThread 为 null
  • state = 0

当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程
找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1
回到 Thread-1 的 acquireQueued 流程

如果加锁成功(没有竞争),会设置

  • exclusiveOwnerThread 为 Thread-1,state=1
  • head 指向刚刚 Thread-1 所在的Node,该 Node 清空 Thread
  • 原本的head因为从链表断开,而可被垃圾回收

如果这时候有其它线程来竞争(非公平的提现),例如这时有 Thread-4 来了

如果不巧又被 Thread-4 占了先

  • Thread-4 被设置为 exclusiveOwnerThread,state = 1
  • Thead-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞
加锁源码

先从 NonfairSync 的 lock 方法开始看

final void lock() {
    // 第一次尝试CAS获取锁,将state的值从0改为1
    if (compareAndSetState(0, 1))
        // 如果加锁成功,设置exclusiveOwnerThread为当前线程
        setExclusiveOwnerThread(Thread.currentThread());
    else
        // 如果加锁失败,走下面这个流程,这是个模板方法
        acquire(1);
}

AQS 里面的 acquire 方法

public final void acquire(int arg) {
    // 第二次尝试CAS获取锁
    if (!tryAcquire(arg) &&
        // 如果再次或者锁失败,则需要创建Node节点放到阻塞队列(双向链表)中,并阻塞当前线程
        // 这个后续再说,先看tryAcquire里面的逻辑
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

NonfairSync 里面的具体实现 tryAcquire 方法

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // 获取state的值
    int c = getState();
    // 如果state为0,则去获取锁,如果成功,设置exclusiveOwnerThread为当前线程,返回true
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 如果exclusiveOwnerThread是当前线程(锁重入的实现,对state的值进行累加)
    // 将累加后的state的值进行更新,返回true
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    // 如果上述流程都不满足,则获取锁失败,返回false
    return false;
}

如果 tryAcquire 方法返回false,流程会继续向下走 addWaiter 方法和 acquireQueued 方法

private Node addWaiter(Node mode) {
    // 根据当前线程创建一个Node节点
    Node node = new Node(Thread.currentThread(), mode);
    // 如果链表有节点,将新创建的当前线程节点放到链表末尾
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果链表还没有节点,将会先创建一个无线程的空节点作为占位节点,为了用于唤醒后继节点使用
    // 然后再将新创建的当前线程节点放到链表末尾
    // 注意这时线程还处于waitStatus为0的活跃状态,需要后续acquireQueued方法来进行park
    enq(node);
    return node;
}

AQS 里面的 acquireQueued 方法

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 获取当前节点的前置节点
            final Node p = node.predecessor();
            // 如果前置节点是头节点,说明当前节点是第二节点
            // 第三次尝试CAS获取锁
            // 如果获取成功,将当前节点作为头节点
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 如果获取失败,先走shouldParkAfterFailedAcquire方法来改变前置节点的waitStatus的值
            // 如果前置节点waitStatus为0,则CAS将waitStatus改为-1,返回false,进入下次循环
            // 如果前置节点waitStatus为-1,则返回true,走parkAndCheckInterrupt方法来park当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 取消获取逻辑
        if (failed)
            cancelAcquire(node);
    }
}
解锁源码

先从 ReentrantLock 的 unlock 方法开始看

public void unlock() {
    sync.release(1);
}

AQS 里面的 release 方法

public final boolean release(int arg) {
    // 尝试去解锁,里面包含了锁重入对state递减操作,如果解锁成功,才会走下面的逻辑
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 找到头节点之后的最近一个被阻塞的节点,并将其线程unpark
            unparkSuccessor(h);
        return true;
    }
    return false;
}

让我们先看下 tryRelease 方法,这个需要在 Sync 的具体实现中来看

protected final boolean tryRelease(int releases) {
    // 对state值进行递减,可能因为锁重入导致state值大于1
    int c = getState() - releases;
    // 如果当前线程不是持有锁线程,则抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 如果state减为0,说明已经释放了锁,将exclusiveOwnerThread置为null,返回free为true
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 重新设置state的值
    setState(c);
    return free;
}

AQS 里面的 unparkSuccessor 方法

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    // 如果头节点的waitStatus小于0,则CAS将其修改为0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 获取后驱节点,如果后驱节点状态大于0(已取消)
    // 从链表尾部开始循环,找到最近的一个后驱节点waitStatus小于等于0的来进行unpark
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果这个后驱节点不为空,对其线程进行unpark操作
    if (s != null)
        LockSupport.unpark(s.thread);
}

这时候非公平锁就和公平锁就有不同之处了,如果这时候恰巧有个线程来和刚刚被 unpark 的线程进行竞争锁,是有可能刚来的线程获取到锁,刚刚被唤醒的线程因没有获取到锁而再次被 park,这时候得跳到 AQS 里面的 acquireQueued 方法来看,因为线程一直在这死循环呢

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 获取当前节点的前置节点
            final Node p = node.predecessor();
            // 如果前置节点是头节点,说明当前节点是第二节点
            // 再次尝试CAS获取锁,这时候就是非公平锁的体现
            // 有可能刚刚被唤醒的线程再次获取锁失败,从而走下面的逻辑再次被park
            // 如果获取成功,将当前节点作为头节点
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 如果获取失败,先走shouldParkAfterFailedAcquire方法来改变前置节点的waitStatus的值
            // 如果前置节点waitStatus为0,则CAS将waitStatus改为-1,返回false,进入下次循环
            // 如果前置节点waitStatus为-1,则返回true,走parkAndCheckInterrupt方法来park当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 取消获取逻辑
        if (failed)
            cancelAcquire(node);
    }
}

2. 公平锁实现原理

公平锁的整体实现流程和非公平锁差别不是很大,但还有一些细节地方稍有不同,让我们直接从源码入手

公平锁源码
// 与非公平锁主要区别在于tryAcquire方法的实现
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 先检查AQS队列中是否有前驱节点,没有才去竞争
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

public final boolean hasQueuedPredecessors() {
    Node t = tail;
    Node h = head;
    Node s;
    // h != t 时表示队列中有Node
    return h != t &&
        // (s = h.next) == null 表示队列中还有没有老二
        // 或者队列中老二线程不是此线程
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

3. 可重入原理

可重入原理上面已经提到过,这里就不赘述了。
当加锁时,如果当前线程为锁持有线程,则对state的值进行累加。
当解锁时,对state的值进行递减,直至state减至0时,解锁完成。

4. 可打断原理

不可打断模式

在此模式下,即使它被打断,扔会驻留在 AQS 队列中,等获得锁后方能继续运行(是继续运行!只是打断标记被设置为 true)

// Sync继承自AQS
static final class NonfairSync extends Sync {
    // ...

    private final boolean parkAndCheckInterrupt() {
        // 如果打断标记已经是true,则park会失效
        LockSupport.park(this);
        // interrupted会清除打断标记
        return Thread.interrupted();
    }
    
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    // 还是需要获取锁后,才能返回打断状态
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 如果是因为interrupt被唤醒,返回打断状态为true
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            // 如果打断状态为true
            selfInterrupt();
    }

    static void selfInterrupt() {
        // 重新产生一次中断
        Thread.currentThread().interrupt();
    }
}
可打断模式
static final class NonfairSync extends Sync {
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 如果没有获得到锁,进行(一)
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

    // (一)可打断的获取锁流程
    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 在park过程中如果被interrupt会进入此
                    // 这时候抛出异常,而不会再次进入for (;;)
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
}

5. 条件变量实现原理

每个条件变量其实就对应着一个等待队列,其实现类是 ConditionObject

await 流程

开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的 addConditionWaiter 流程
创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部

接下来进入 AQS 的 fullyRelease 流程,释放同步器上的锁

unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功

park 阻塞 Thread-0

signal 流程

假设 Thread-1 要来唤醒 Thread-0

进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node

执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的 waitStatus 改为 -1

Thread-1 释放锁,进入 unlock 流程,略

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容