J.U.C简介
Java.util.concurrent 是在并发编程中比较常用的工具类,里面包含很多用来在并发场景中使用的组件。比如线程池、阻塞队列、计时器、同步器、并发集合等等。并发包的作者是大名鼎鼎的 Doug Lea。我们在接下来的课程中,回去剖析一些经典的比较常用的组件的设计思想
Lock
Lock
在J.U.C
中是最核心的组件,前面我们讲synchronized
的时候说过,锁最重要的特性就是解决并发安全问题。为什么要以Lock
作为切入点呢?如果有同学看过J.U.C
包中的所有组件,一定会发现绝大部分的组件都有用到了Lock
。所以通过Lock
作为切入点使得在后续的学习过程中会更加轻松。
Lock 简介
在
Lock
接口出现之前,Java
中的应用程序对于多线程的并发安全处理只能基于synchronized
关键字来解决。但是synchronized
在有些场景中会存在一些短板,也就是它并不适合于所有的并发场景。但是在Java5
以后,Lock
的出现可以解决synchronized
在某些场景中的短板,它比synchronized
更加灵活
Lock 的实现
Lock
本质上是一个接口,它定义了释放锁和获得锁的抽象方法,定义成接口就意味着它定义了锁的一个标准规范,也同时意味着锁的不同实现。实现Lock
接口的类有很多,以下为几个常见的锁实现:
ReentrantLock
:表示重入锁,它是唯一一个实现了Lock
接口的类。重入锁指的是线程在获得锁之后,再次获取该锁不需要阻塞,而是直接关联一次计数器增加重入次数。
ReentrantReadWriteLock
:重入读写锁,它实现了ReadWriteLock
接口,在这个类中维护了两个锁,一个是 ReadLock,一个是WriteLock
,他们都分别实现了Lock
接口。读写锁是一种适合读多写少的场景下解决线程安全问题的工具,基本原则是: 读和读不互斥、读和写互斥、写和写互斥。也就是说涉及到影响数据变化的操作都会存在互斥。
StampedLock
:stampedLock
是JDK8
引入的新的锁机制,可以简单认为是读写锁的一个改进版本,读写锁虽然通过分离读和写的功能使得读和读之间可以完全并发,但是读和写是有冲突的,如果大量的读线程存在,可能会引起写线程的饥饿。stampedLock
是一种乐观的读策略,使得乐观锁完全不会阻塞写线程。
Lock 的类关系图
Lock
有很多的锁的实现,但是直观的实现是 ReentrantLock
重入锁
ReentrantLock 重入锁
重入锁,表示支持重新进入的锁,也就是说,如果当前线程 t1 通过调用 lock
方法获取了锁之后,再次调用 lock
,是不会再阻塞去获取锁的,直接增加重试次数就行了。synchronized
和 ReentrantLock
都是可重入锁。很多同学不理解为什么锁会存在重入的特性,那是因为对于同步锁的理解程度还不够,比如在下面这类的场景中,存在多个加锁的方法的相互调用,其实就是一种重入特性的场景。
重入锁的设计目的
比如调用 demo 方法获得了当前的对象锁,然后在这个方法中再去调用
demo2,demo2 中的存在同一个实例锁,这个时候当前线程会因为无法获得demo2 的对象锁而阻塞,就会产生死锁。重入锁的设计目的是避免线程的死锁。
public class ReentrantDemo{
public synchronized void demo(){
System.out.println("begin:demo");
demo2();
}
public void demo2(){
System.out.println("begin:demo1");
synchronized (this){
}
}
public static void main(String[] args) {
ReentrantDemo rd=new ReentrantDemo();
new Thread(rd::demo).start();
}
}
ReentrantLock 的使用案例
public class AtomicDemo {
private static int count = 0;
static Lock lock = new ReentrantLock();
public static void inc(){
lock.lock();
try {
Thread.sleep(1);
count++;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
for(int i = 0;i < 1000;i++){
new Thread(()->{
AtomicDemo.inc();
}).start();;
}
Thread.sleep(3000);
System.out.println("result:"+count);
}
}
ReentrantReadWriteLock
我们以前理解的锁,基本都是排他锁,也就是这些锁在同一时刻只允许一个线程进行访问,而读写所在同一时刻可以允许多个线程访问,但是在写线程访问时,所有的读线程和其他写线程都会被阻塞。读写锁维护了一对锁,一个读锁、一个写锁; 一般情况下,读写锁的性能都会比排它锁好,因为大多数场景读是多于写的。在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量
public class LockDemo {
static Map<String,Object> cacheMap=new HashMap<>();
static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
static Lock read = rwl.readLock();
static Lock write = rwl.writeLock();
public static final Object get(String key) {
System.out.println("开始读取数据");
read.lock(); //读锁
try {
return cacheMap.get(key);
}finally {
read.unlock();
}
}
public static final Object put(String key,Object value){
write.lock();
System.out.println("开始写数据");
try{
return cacheMap.put(key,value);
}finally {
write.unlock();
}
}
}
在这个案例中,通过 hashmap 来模拟了一个内存缓存,然后使用读写所来保证这个内存缓存的线程安全性。当执行读操作的时候,需要获取读锁,在并发访问的时候,读锁不会被阻塞,因为读操作不会影响执行结果。
在执行写操作是,线程必须要获取写锁,当已经有线程持有写锁的情况下,当前线程会被阻塞,只有当写锁释放以后,其他读写操作才能继续执行。使用读写锁提升读操作的并发性,也保证每次写操作对所有的读写操作的可见性
- 读锁与读锁可以共享
- 读锁与写锁不可以共享(排他)
- 写锁与写锁不可以共享(排他)
ReentrantLock 的实现原理
我们知道锁的基本原理是,基于将多线程并行任务通过某一种机制实现线程的串行执行,从而达到线程安全性的目的。在 synchronized 中,我们分析了偏向锁、轻量级锁、乐观锁。基于乐观锁以及自旋锁来优化了 synchronized 的加锁开销,同时在重量级锁阶段,通过线程的阻塞以及唤醒来达到线程竞争和同步的目的。那么在 ReentrantLock 中,也一定会存在这样的需要去解决的问题。就是在多线程竞争重入锁时,竞争失败的线程是如何实现阻塞以及被唤醒的呢?
AQS 是什么
在 Lock
中,用到了一个同步队列 AQS
,全称 AbstractQueuedSynchronizer
,它是一个同步工具也是 Lock
用来实现线程同步的核心组件。如果你搞懂了AQS
,那么J.U.C
中绝大部分的工具都能轻松掌握。
AQS 的两种功能
从使用层面来说,AQS
的功能分为两种:独占和共享
- 独占锁,每次只能有一个线程持有锁,比如前面给大家演示的
ReentrantLock
就是以独占方式实现的互斥锁 - 共 享 锁 , 允 许 多 个 线 程 同 时 获 取 锁 , 并 发 访 问 共 享 资 源 , 比 如
ReentrantReadWriteLock
AQS 的内部实现
AQS
队列内部维护的是一个FIFO
的双向链表,这种结构的特点是每个数据结构都有两个指针,分别指向直接的后继节点和直接前驱节点。所以双向链表可以从任意一个节点开始很方便的访问前驱和后继。每个 Node
其实是由线程封装,当线程争抢锁失败后会封装成 Node
加入到 ASQ
队列中去;当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点(线程)。
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
/**
* 前驱节点
*/
volatile Node prev;
/**
* 后继节点
*/
volatile Node next;
/**
* 当前线程
*/
volatile Thread thread;
/**
* condition等待队列的后继节点
*/
Node nextWaiter;
/**
* 是否为共享锁
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
/**
*将线程构建成Node,t添加到等待队列
*/
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
/**这个方法会在Condition队列使用,*/
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
释放锁以及添加线程对于队列的变化
当出现锁竞争以及释放锁的时候,AQS 同步队列中的节点会发生变化,首先看一下添加节点的场景。
这里会涉及到两个变化:
1、新的线程封装成 Node 节点追加到同步队列中,设置 prev 节点以及修改当前节点的前置节点的 next 节点指向自己
2、 通过 CAS 讲 tail 重新指向新的尾部节点head 节点表示获取锁成功的节点,当头结点在释放同步状态时,会唤醒后继节点,如果后继节点获得锁成功,会把自己设置为头结点,节点的变化过程如下:
这个过程也是涉及到两个变化:
1、 修改 head 节点指向下一个获得锁的节点
2 、新的获得锁的节点,将 prev 的指针指向 null
注* 设置 head 节点不需要用 CAS,原因是设置 head 节点是由获得锁的线程来完成的,而同步锁只能由一个线程获得,所以不需要 CAS 保证只需要把 head 节点设置为原首节点的后继节点,并且断开原 head 节点的 next 引用即可
ReentrantLock 的源码分析
以 ReentrantLock
作为切入点,来看看在这个场景中是如何使用AQS
来实现线程的同步的ReentrantLock
的时序图。
调用 ReentrantLock 中的 lock()方法,源码的调用过程我使用了时序图来展现
ReentrantLock.lock()
这个是 reentrantLock 获取锁的入口
public void lock() {
sync.lock();
}
Sync
实际上是一个抽象的静态内部类,它继承了AQS
来实现重入锁的逻辑,我们前面说过 AQS
是一个同步队列,它能够实现线程的阻塞以及唤醒,但它并不具备业务功能,所以在不同的同步场景中,会继承 AQS
来实现对应场景的功能。
Sync
有两个具体的实现类,分别是:
-
NofairSync
:表示可以存在抢占锁的功能,也就是说不管当前队列上是否存在其他线程等待,新线程都有机会抢占锁。 -
FailSync
: 表示所有线程严格按照FIFO
来获取锁。
NofairSync.lock
以非公平锁为例,来看看 lock 中的实现
- 非公平锁和公平锁最大的区别在于,在非公平锁中我抢占锁的逻辑是,不管有没有线程排队,我先上来 cas 去抢占一下。
- CAS 成功,就表示成功获得了锁。
- CAS 失败,调用 acquire(1)走锁竞争逻辑。
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
CAS 的实现原理
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
通过 cas 乐观锁的方式来做比较并替换,这段代码的意思是,如果当前内存中的state 的值和预期值 expect 相等,则替换为 update。更新成功返回 true,否则返回 false.
这个操作是原子的,不会出现线程安全问题,这里面涉及到Unsafe这个类的操作,以及涉及到 state 这个属性的意义。
state 是 AQS 中的一个属性,它在不同的实现中所表达的含义不一样,对于重入锁的实现来说,表示一个同步状态。它有两个含义的表示
- 当 state=0 时,表示无锁状态
- 当 state>0 时,表示已经有线程获得了锁,也就是 state=1,但是因为
ReentrantLock 允许重入,所以同一个线程多次获得同步锁的时候,state 会递增,比如重入 5 次,那么 state=5。而在释放锁的时候,同样需要释放 5 次直到 state=0其他线程才有资格获得锁
Unsafe 类
Unsafe
类是在 sun.misc
包下,不属于 Java 标准。但是很多 Java 的基础类库,包括一些被广泛使用的高性能开发库都是基于 Unsafe
类开发的,比如 Netty、Hadoop、Kafka
等;
Unsafe
可认为是 Java 中留下的后门,提供了一些低层次操作,如直接内存访问、线程的挂起和恢复、CAS
、线程同步、内存屏障;
而 CAS
就是 Unsafe
类中提供的一个原子操作,第一个参数为需要改变的对象,第二个为偏移量(即之前求出来的 headOffset
的值),第三个参数为期待的值,第四个为更新后的值整个方法的作用是如果当前时刻的值等于预期值 var4 相等,则更新为新的期望值 var5
,如果更新成功,则返回 true
,否则返回 false
。
stateOffset
一个 Java 对象可以看成是一段内存,每个字段都得按照一定的顺序放在这段内存里,通过这个方法可以准确地告诉你某个字段相对于对象的起始内存地址的字节偏移。用于在后面的 compareAndSwapInt
中,去根据偏移量找到对象在内存中的具体位置所以stateOffset
表示 state
这个字段在 AQS
类的内存中相对于该类首地址的偏移量
compareAndSwapInt
在 unsafe.cpp
文件中,可以找到 compareAndSwarpInt
的实现
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env,jobject unsafe,
jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj); //将 Java 对象解析成 JVM 的 oop(普通对象指针),
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset); //根据对象 p和地址偏移量找到地址
return (jint)(Atomic::cmpxchg(x, addr, e)) == e; //基于 cas 比较并替换, x 表示需要更新的值,addr 表示 state 在内存中的地址,e 表示预期值UNSAFE_END
AQS.accquire
acquire
是 AQS
中的方法,如果 CAS
操作未能成功,说明 state
已经不为 0,此时继续 acquire(1)
操作
➢ 大家思考一下,acquire
方法中的 1 的参数是用来做什么呢?
这个方法的主要逻辑是
- 通过
tryAcquire
尝试获取独占锁,如果成功返回 true,失败返回 false - 如果
tryAcquire
失败,则会通过addWaiter
方法将当前线程封装成Node
添加到AQS
队列尾部 -
acquireQueued
,将Node
作为参数,通过自旋去尝试获取锁。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
NonfairSync.tryAcquire
这个方法的作用是尝试获取锁,如果成功返回true
,不成功返回false
它是重写 AQS
类中的 tryAcquire
方法,并且大家仔细看一下 AQS
中 tryAcquire
方法的定义,并没有实现,而是抛出异常。按照一般的思维模式,既然是一个不实现的模版方法,那应该定义成 abstract
,让子类来实现呀?大家想想为什么
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}