1.锁的可重入性
“可重入锁”是指当一个线程调用 object.lock()获取到锁,进入临界区后,再次调用object.lock(),仍然可以获取到该锁。显然,通常的锁都要设计成可重入的,否则就会发生死锁。
在一个synchronized方法method1()里面调用另外一个synchronized方法method2()。如果synchronized关键字不可重入,那么在method2()处就会发生阻塞,这显然不可行。
public void synchronized method1() {
// ... method2();
// ...
}
public void synchronized method2() {
// ...
}
Lock接口:
void lock();//获取锁
void lockInterruptibly() throws InterruptedException; //获取锁,可以被其他线程中断
boolean tryLock();//获取锁,非阻塞,基于非公平锁tryAcquire,操作不成功直接返回false
boolean tryLock(long var1, TimeUnit var3) throws InterruptedException; //获取锁。非阻塞,有时间限制
void unlock(); //释放锁
Condition newCondition();
ReentrantLock本身没有代码逻辑,实现都在其内部类Sync中:
public class ReentrantLock implements Lock, Serializable {
private final ReentrantLock.Sync sync;
public void lock() {
this.sync.acquire(1);
}
public void unlock() {
this.sync.release(1);
}
}
可重入锁:synchronized、ReentrantLock
2.公平锁vs非公平锁
Sync是一个抽象类,它有两个子类FairSync与NonfairSync,分别对应公平锁和非公平锁。
ReentrantLock构造方法可以看出,会传入一个布尔类型的变量fair指定锁是公平的还是非公平的,默认为非公平的。其实是为了提高效率,减少线程切换(重入锁再次获取的时候非公平锁可以减少线切换)。
public ReentrantLock() {
this.sync = new ReentrantLock.NonfairSync();
}
public ReentrantLock(boolean fair) {
this.sync = (ReentrantLock.Sync)(fair ? new ReentrantLock.FairSync() : new ReentrantLock.NonfairSync());
}
ReentrantLock的类结构*
其中:
ReentrantLock implements Lock, Serializable
继承并实现了Lock接口的lock、tryLock、unlock、newCondition方法,但是方法的具体实现是在其静态内部类:ReentrantLock.Sync,abstract static class Sync extends AbstractQueuedSynchronizer
和Sync的两个子类NonfairSync static final class NonfairSync extends ReentrantLock.Sync
与static final class FairSync extends ReentrantLock.Sync
实现
3.锁实现的基本原理
Sync的父类AbstractQueuedSynchronizer经常被称作队列同步器(AQS),这个类非常重要,该类的父类是AbstractOwnableSynchronizer。
此处的锁具备synchronized功能,即可以阻塞一个线程。为了实现一把具有阻塞或唤醒功能的锁,需要几个核心要素:
- 需要一个state变量,标记该锁状态,0表示当前对象没有被线程独占。state变量至少有两个值:0、1。对state变量的操作,用CAS保证线程安全。
- 需要记录当前是哪个线程持有锁。
- 需要底层支持对一个线程进行阻塞或唤醒操作。
- 需要有一个队列维护所有阻塞的线程。这个队列也必须是线程安全的无锁队列,也需要使用CAS
具体的实现以ReentrantLock为例。
要素1和2,在上面两个类中有对应的体现
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
// ...
private transient Thread exclusiveOwnerThread; // 记录持有锁的线程
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
private volatile int state; // 记录锁的状态,通过CAS修改state的值。
// ...
}
state取值不仅可以是0、1,还可以大于1,就是为了支持锁的可重入性。
要素3:Unsafe类提供了阻塞或唤醒线程的一对操作原语,也就是park/unpark。Synchronizer的唤醒是唤醒所有线程。
public native void unpark(Object thread);
public native void park(boolean isAbsolute, long time);
有一个LockSupport的工具类,对这一对原语做了简单封装:
public class LockSupport {
// ...
private static final Unsafe U = Unsafe.getUnsafe();
public static void park() {
U.park(false, 0L);
}
public static void unpark(Thread thread) {
if (thread != null)
U.unpark(thread);
} }
在当前线程中调用park(),该线程就会被阻塞;在另外一个线程中,调用unpark(Threadthread),传入一个被阻塞的线程,就可以唤醒阻塞在park()地方的线程。LockSupport.park()/unpark()。这对函数非常关键,Concurrent包中Lock的实现即依赖这一对操作原语。
Unsafe类提供了阻塞或唤醒线程的一对操作原语,也就是park/unpark。个LockSupport的工具类,对这一对原语做了简单封装
要素4:在AQS中利用双向链表和CAS实现了一个阻塞队列
public abstract class AbstractQueuedSynchronizer {
// ...
static final class Node {
volatile Thread thread; // 每个Node对应一个被阻塞的线程
volatile Node prev;
volatile Node next;
// ...
}
private transient volatile Node head;
private transient volatile Node tail;
// ...
}
阻塞队列是整个AQS核心中的核心。
初始的时候,head=tail=NULL;然后,在往队列中加入阻塞的线程时,会新建一个空的Node,让head和tail都指向这个空Node;之后,再有线程就在node后面加入被阻塞的线程对象。所以,当head=tail的时候,说明队列为空。
4.公平与非公平的lock()实现差异
ReentrantLock的lock方法:
public void lock() {
this.sync.acquire(1);
}
public final void acquire(int arg) {
if (!this.tryAcquire(arg) && this.acquireQueued(this.addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg)) {
selfInterrupt();
}
}
NonfairSync:未判断当前线程位置直接去获取锁,没排队是非公平的
FairSync: hasQueuedPredecessors()判断位置是否在队列头部,是排队的是公平的
5.阻塞队列与唤醒机制
下面进入锁的最为关键的部分,即acquireQueued(...)
在AbstractQueuedSynchronizer类中:
public final void acquire(int arg) {
if (!this.tryAcquire(arg) && this.acquireQueued(this.addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg)) {
selfInterrupt();
}
}
tryAcquire先尝试获取一下,如果获取不不到然后继续往下进行。
先说addWaiter(...)方法,就是为当前线程生成一个Node,然后把Node放入双向链表的尾部。要注意的是,这只是把Thread对象放入了一个队列中而已,线程本身并未阻塞。
private AbstractQueuedSynchronizer.Node addWaiter(AbstractQueuedSynchronizer.Node mode) {
AbstractQueuedSynchronizer.Node node = new AbstractQueuedSynchronizer.Node(mode);
AbstractQueuedSynchronizer.Node oldTail;
do {
while(true) {
oldTail = this.tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
break;
}
this.initializeSyncQueue();
}
} while(!this.compareAndSetTail(oldTail, node));
oldTail.next = node;
return node;
}
创建节点,尝试将节点追加到队列尾部。获取tail节点,将tail节点的next设置为当前节点。
如果tail不存在,就初始化队列。
在addWaiter(...)方法把Thread对象加入阻塞队列之后的工作就要靠acquireQueued(...)方法完成。线程一旦进入acquireQueued(...)就会被无限期阻塞,即使有其他线程调用interrupt()方法也不能将其唤醒,除非有其他线程释放了锁,并且该线程拿到了锁,才会从accquireQueued(...)返回。
进入acquireQueued(...),该线程被阻塞。在该方法返回的一刻,就是拿到锁的那一刻,也就是被唤醒的那一刻,此时会删除队列的第一个元素(head指针前移1个节点)。
final boolean acquireQueued(AbstractQueuedSynchronizer.Node node, int arg) {
boolean interrupted = false;
try {
while(true) {
AbstractQueuedSynchronizer.Node p = node.predecessor();
if (p == this.head && this.tryAcquire(arg)) {
this.setHead(node);
p.next = null;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node)) {
//获取到锁之后执行中断响应操作
interrupted |= this.parkAndCheckInterrupt();
}
}
} catch (Throwable var5) {
this.cancelAcquire(node);
if (interrupted) {
selfInterrupt();
}
throw var5;
} }
首先,acquireQueued(...)方法有一个返回值,表示什么意思呢?虽然该方法不会中断响应(也就是,在队列中阻塞的时候,有别的线程唤醒此线程,去中断它,并不会得到响应),但它会记录被阻塞期间有没有其他线程向它发送过中断信号。如果有,则该方法会返回true,在parkAndCheckInterrupt方法中进行响应;否则,返回false。
基于这个返回值,才有了下面的代码:
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
当 acquireQueued(...)返回 true 时,会调用 selfInterrupt(),自己给自己发送中断信号,也就是自己把自己的中断标志位设为true。之所以要这么做,是因为自己在阻塞期间,收到其他线程中断信号没有及时响应,现在要进行补偿。这样一来,如果该线程在lock代码块内部有调用sleep()之类的阻塞方法,就可以抛出异常,响应该中断信号。
阻塞就发生在下面这个方法中:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
线程调用 park()方法,自己把自己阻塞起来,直到被其他线程唤醒,该方法返回。
park()方法返回有两种情况。
- 其他线程调用了unpark(Thread t)。
- 其他线程调用了t.interrupt()。这里要注意的是,lock()不能响应中断,但LockSupport.park()会响应中断。
- 也正因为LockSupport.park()可能被中断唤醒,acquireQueued(...)方法才写了一个for死循环。唤醒之后,如果发现自己排在队列头部,就去拿锁;如果拿不到锁,则再次自己阻塞自己。不断重复此过程,直到拿到锁。
- 被唤醒之后,通过Thread.interrupted()来判断是否被中断唤醒。如果是情况1,会返回false;如果是情况2,则返回true
6.unlock()实现分析
unlock不区分公平还是非公平。
public void unlock() {
this.sync.release(1);
}
当前线程要释放锁,先调用tryRelease(arg)方法,如果返回true,则取出head,让head获取锁。
public final boolean release(int arg) {
if (this.tryRelease(arg)) {
AbstractQueuedSynchronizer.Node h = this.head;
if (h != null && h.waitStatus != 0) {
this.unparkSuccessor(h);
}
return true;
} else {
return false;
}}
对于tryRelease方法:
首先计算当前线程释放锁后的state值。如果不为0返回空,如果等于零设置排他线程为null,返回true
如果当前线程不是排他线程,则抛异常,因为只有获取锁的线程才可以进行释放锁的操作。
此时设置state,没有使用CAS,因为是单线程操作。
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
int c = this.getState() - releases;
if (Thread.currentThread() != this.getExclusiveOwnerThread()) {
throw new IllegalMonitorStateException();
} else {
boolean free = false;
if (c == 0) {
free = true;
this.setExclusiveOwnerThread((Thread)null);
}
this.setState(c);
return free;
}
}
release()里面做了两件事:tryRelease(...)方法释放锁;unparkSuccessor(...)方法唤醒队列中的后继者。