一、cas自旋原理
1、概念
CAS的全称是Compare-And-Swap,它是CPU并发原语,原语的执行必须是连续的,在执行过程中不允许被中断,也就是说CAS是一条CPU的原子指令,不会造成所谓的数据不一致性问题,是线程安全的。CAS并发原语体现在Java语言中就是sun.misc.Unsafe类的各个方法,调用UnSafe类中的CAS方法。从其命名可以发现,其本质就是比较和替换。
2、手动实现一个自旋锁
private static int num = 0;
public static boolean add(int source, int target) {
int count = 0;
while (true) {
if (num == source) {
num = target;
return true;
} else {
count++;
if (count == 10) {
return false;
}
}
}
}
public static void main(String[] args) {
//线程栅栏,等待所有线程准备完毕后执行
CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
//内部使用ReentrantLock重入锁
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
boolean flag = add(0, 1);
if (flag) {
System.out.println(Thread.currentThread().getName() + "更新成功==================");
} else {
System.out.println(Thread.currentThread().getName() + "更新失败");
}
}).start();
}
}
结果:只有一条更新成功
Thread-0更新失败
Thread-7更新失败
Thread-6更新失败
Thread-5更新失败
Thread-4更新失败
Thread-3更新失败
Thread-8更新成功==================
Thread-2更新失败
Thread-1更新失败
Thread-9更新失败
3、底层核心
sun.misc.Unsafe是CAS的底层核心类,Unsafe类中所有方法都是native修饰的,也就是说Unsafe类中的方法都直接调用操作系统底层资源执行相应任务。
以ava.util.concurrent.atomic.AtomicInteger的getAndIncrement方法源码分析
/**
* 当前值自增1
**/
public final int getAndIncrement() {
//valueOffset系统偏移量
return unsafe.getAndAddInt(this, valueOffset, 1);
}
/**
* 获取当前值var5,并加var4
**/
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
//获取主内存当前值var5
var5 = this.getIntVolatile(var1, var2);
//cas循环等待替换,var5+var4是替换后的值
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
4、CAS缺点
1)循环时间长CPU开销大
2)只能保证一个共享变量的原子操作
3)会引发ABA问题
5、ABA问题介绍及解决
简单通过代码实现下ABA问题,线程Thread-0先将num修改为了1,然后又将num修改成了0;线程Thread-1则认为当前num一直没有经过改变,而将其修改成了10。这里我们可以发现问题,这时候num虽然值仍然是0,但是其实已经不是最开始那个0了,这样在某些情况下就会导致问题。
private static int num = 0;
public static boolean add(int source, int target) {
int count = 0;
while (true) {
if (num == source) {
num = target;
return true;
} else {
count++;
if (count == 10) {
return false;
}
}
}
}
public static void main(String[] args) {
new Thread(() -> {
//将数据更新为1
if (add(0, 1)) {
System.out.println(Thread.currentThread().getName() + "更新num为1成功");
} else {
System.out.println(Thread.currentThread().getName() + "更新num为1失败");
}
//将数据更新为0
if (add(1, 0)) {
System.out.println(Thread.currentThread().getName() + "更新num为0成功");
} else {
System.out.println(Thread.currentThread().getName() + "更新num为0失败");
}
}).start();
new Thread(() -> {
//将数据更新为10
if (add(0, 10)) {
System.out.println(Thread.currentThread().getName() + "更新num为10成功");
} else {
System.out.println(Thread.currentThread().getName() + "更新num为10失败");
}
}).start();
}
结果:
Thread-0更新num为1成功
Thread-0更新num为0成功
Thread-1更新num为10成功
上面我自行实现的自旋锁过程,下面看一个atomic原子类的实现。非常简单
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(0);
new Thread(()->{
atomicInteger.compareAndSet(0,1);
atomicInteger.compareAndSet(1,0);
}).start();
new Thread(()->{
boolean b = atomicInteger.compareAndSet(0, 10);
if (b){
System.out.println("更新为10成功");
}else{
System.out.println("更新为10失败");
}
}).start();
}
结果:
更新为10成功
ABA问题的解决:其实问题的本质原因在于我们的乐观锁只比较了值是否相等,可以通过增加其他属性的比较,例如时间戳、版本号等。这里我们采用AtomicStampedReference类解决该问题。
/**
* 构造方法
* @param initialRef 初始值
* @param initialStamp 初始版本戳
*/
public AtomicStampedReference(V initialRef, int initialStamp) {
pair = Pair.of(initialRef, initialStamp);
}
/**
* CAS方法
* @param expectedReference 初始值
* @param newReference 替换值
* @param expectedStamp 初始版本戳
* @param newStamp 新版本戳
* @return
*/
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
AtomicStampedReference.Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, AtomicStampedReference.Pair.of(newReference, newStamp)));
}
实例:
public static void main(String[] args) {
AtomicStampedReference atomicStampedReference = new AtomicStampedReference(0, 0);
new Thread(() -> {
atomicStampedReference.compareAndSet(0, 1, 0, 1);
atomicStampedReference.compareAndSet(1, 0, 1, 2);
}).start();
new Thread(() -> {
boolean b = atomicStampedReference.compareAndSet(0, 1, 0, 1);
if (b) {
System.out.println("更新为10成功");
} else {
System.out.println("更新为10失败");
}
}).start();
}
结果:
更新为10失败
二、ReentrantLock可重入锁
在上一篇基础概念中,我们使用ReentrantLock实现了线程同步问题,代码如下:
/**
* 库存
*/
static class Inventory {
//初始化ReentrantLock实例
Lock lock = new ReentrantLock();
//库存数量
private int num = 100;
//增加库存
public void add(int n) {
//加锁
lock.lock();
try {
num += n;
System.out.println("增加库存后的数量=" + num);
} finally {
//释放锁
lock.unlock();
}
}
//减少库存
public void sub(int n) {
//加锁
lock.lock();
try {
num -= n;
System.out.println("减少库存后的数量=" + num);
} finally {
//释放锁
lock.unlock();
}
}
}
public static void main(String[] args) {
Inventory inventory = new Inventory();
for (int i = 0; i < 100; i++) {
new Thread(() -> {
inventory.add(1);
}).start();
}
for (int i = 0; i < 100; i++) {
new Thread(() -> {
inventory.sub(1);
}).start();
}
}
1、代码内部依赖关系
我们借ReentrantLock看下java内锁的底层结构,后续我们进行逐个节点的分析
2、接下来我们来分析下底层原理:
ReentrantLock位于java.util.concurrent.locks包下,其实中包含三个内部类。
Syn:继承AbstractQueuedSynchronizer(AQS),用于实现同步机制。
FairSync:公平锁对象,继承Syn。
NonfairSync:非公平锁对象,继承Syn。
2.1 AbstractQueuedSynchronizer(AQS)
用来构建锁或其他同步组件的框架,是JDK中实现并发编程的核心,它提供了一个基于FIFO队列,平时我们工作中经常用到的ReentrantLock,CountDownLatch等都是基于它来实现的。
分析其源码,有两个内部类
Node:同步队列的模型
ConditionObject:等待队列的模型
逐一看下其内部源码:
Node源码:
static final class Node {
// 模式,分为共享与独占
// 共享模式
static final Node SHARED = new Node();
// 独占模式
static final Node EXCLUSIVE = null;
// 结点状态
// CANCELLED,值为1,表示当前的线程被取消
// SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark
// CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中
// PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行
// 值为0,表示当前节点在sync队列中,等待着获取锁
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 结点状态
volatile int waitStatus;
// 前驱结点
volatile Node prev;
// 后继结点
volatile Node next;
// 结点所对应的线程
volatile Thread thread;
// 下一个等待者
Node nextWaiter;
// 结点是否在共享模式下等待
final boolean isShared() {
return nextWaiter == SHARED;
}
// 获取前驱结点,若前驱结点为空,抛出异常
final Node predecessor() throws NullPointerException {
// 保存前驱结点
Node p = prev;
if (p == null) // 前驱结点为空,抛出异常
throw new NullPointerException();
else // 前驱结点不为空,返回
return p;
}
// 无参构造函数
Node() { // Used to establish initial head or SHARED marker
}
// 构造函数
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 构造函数
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
ConditionObject 源码
实现了condition接口,关于condition的学习请看下一小节:三、Condition条件等待与通知
// 内部类
public class ConditionObject implements Condition, java.io.Serializable {
// 版本号
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
// condition队列的头结点
private transient Node firstWaiter;
/** Last node of condition queue. */
// condition队列的尾结点
private transient Node lastWaiter;
/**
* 构造函数
*/
public ConditionObject() { }
/**
* 添加新的waiter到wait队列
*/
private Node addConditionWaiter() {
// 保存尾结点
Node t = lastWaiter;
// 尾结点不为空,并且尾结点的状态不为CONDITION
if (t != null && t.waitStatus != Node.CONDITION) {
// 清除状态不为CONDITION的结点,对firstWaiter和lastWaiter重新赋值
unlinkCancelledWaiters();
// 将最后一个结点重新赋值给t
t = lastWaiter;
}
// 新建一个结点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 尾结点为空
if (t == null)
// 设置condition队列的头结点
firstWaiter = node;
else
// 设置为节点的nextWaiter域为node结点
t.nextWaiter = node;
// 更新condition队列的尾结点
lastWaiter = node;
return node;
}
/**
* 转移first节点到sync队列
*/
private void doSignal(Node first) {
// 循环
do {
// 将下一个节点设为首节点,如果为空
if ( (firstWaiter = first.nextWaiter) == null)
// 设置尾结点为空
lastWaiter = null;
// 设置first结点的nextWaiter域
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null); // 将结点从condition队列转移到sync队列失败并且condition队列中的头结点不为空,一直循环
}
/**
* 转移所有等待队列的节点到同步队列
*/
private void doSignalAll(Node first) {
// condition队列的头结点尾结点都设置为空
lastWaiter = firstWaiter = null;
// 循环
do {
// 获取first结点的nextWaiter域结点
Node next = first.nextWaiter;
// 设置first结点的nextWaiter域为空
first.nextWaiter = null;
// 将first结点从condition队列转移到sync队列
transferForSignal(first);
// 重新设置first
first = next;
} while (first != null);
}
/**
* 过滤掉状态不为CONDITION的节点
* 对firstWaiter和lastWaiter重新赋值
**/
private void unlinkCancelledWaiters() {
// 保存condition队列头结点
Node t = firstWaiter;
Node trail = null;
while (t != null) {
// 下一个结点
Node next = t.nextWaiter;
// t结点的状态不为CONDTION状态
if (t.waitStatus != Node.CONDITION) {
// 设置t节点的额nextWaiter域为空
t.nextWaiter = null;
if (trail == null) // trail为空
// 重新设置condition队列的头结点
firstWaiter = next;
else
// 设置trail结点的nextWaiter域为next结点
trail.nextWaiter = next;
if (next == null) // next结点为空
// 设置condition队列的尾结点
lastWaiter = trail;
}
else // t结点的状态为CONDTION状态
// 设置trail结点
trail = t;
// 设置t结点
t = next;
}
}
/**
* 实现Condition接口的signal方法
*/
public final void signal() {
if (!isHeldExclusively()) // 不被当前线程独占,抛出异常
throw new IllegalMonitorStateException();
// 保存condition队列头结点
Node first = firstWaiter;
if (first != null) // 头结点不为空
// 唤醒一个等待线程
doSignal(first);
}
/**
* 实现Condition的signalAll方法,唤醒所有线程
*/
public final void signalAll() {
if (!isHeldExclusively()) // 不被当前线程独占,抛出异常
throw new IllegalMonitorStateException();
// 保存condition队列头结点
Node first = firstWaiter;
if (first != null) // 头结点不为空
// 唤醒所有等待线程
doSignalAll(first);
}
/**
* 与await()区别在于,使用await方法,调用interrupt()中断后会报错,而该方法不会报错。
*/
public final void awaitUninterruptibly() {
// 添加一个结点到等待队列
Node node = addConditionWaiter();
// 获取释放的状态
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) { //
// 阻塞当前线程
LockSupport.park(this);
if (Thread.interrupted()) // 当前线程被中断
// 设置interrupted状态
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted) //
selfInterrupt();
}
/**
* 等待,当前线程在接到信号或被中断之前一直处于等待状态
*/
public final void await() throws InterruptedException {
// 当前线程被中断,抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程包装成Node,尾插入到等待队列中
Node node = addConditionWaiter();
// 释放当前线程所占用的lock,在释放的过程中会唤醒同步队列中的下一个节点
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 当前线程进入到等待状态
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 检查结点等待时的中断类型
break;
}
// 自旋等待获取到同步状态(即获取到lock)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 处理被中断的情况
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
/**
* 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态
*/
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
/**
* 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态
*/
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
/**
* 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等
* 效于:awaitNanos(unit.toNanos(time)) > 0
*/
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
// 1. 将当前线程包装成Node,尾插入到等待队列中
Node node = addConditionWaiter();
// 2. 释放当前线程所占用的lock,在释放的过程中会唤醒同步队列中的下一个节点
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
大概了解源码后我们通过图看下同步队列和等待队列的关系:
同步队列是一个双向的链表,每个节点会存储下一个节点的信息,是一种队列的实现。
等待队列是一个单向的链表,只有使用到Condition时才会存在,并且会存在多个。
当等待队列的线程被唤醒会被添加到同步队列的尾部。
2.2 公平锁与非公平锁
二者的区别主要在于获取锁是否和排队顺序有关。当锁呗一个线程持有,其他尝试获取锁的线程会被挂起,加到等待队列中,先被挂起的在队列的最前端。当锁被释放,需要通知队列中的线程。作为公平锁,会先唤醒队列最前端的线程;而非公平锁会唤醒所有线程,通过竞争去获取锁,后来的线程有可能获得锁。
3.3 lock()和unlock()
我们通过本节的开始时提供的例子,代码跟踪发现lock()默认走的是非公平锁:
public ReentrantLock() {
//初始化默认是非公平锁
sync = new NonfairSync();
}
可以通过设置boolean的值设置是公平锁还是非公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
lock()方法走NonFairLock的lock方法
public void lock() {
sync.lock();
}
/**
* 获取锁
*/
final void lock() {
//CAS尝试设置锁状态,占用锁
if (compareAndSetState(0, 1))
//修改状态成功,设置当前线程为独占锁拥有者
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
只有一个线程的时候会直接独占,当存在线程竞争的时候CAS获取会返回false,走acquire(1);走到AQS的acquire方法。
public final void acquire(int arg) {
//走非公平锁的获取锁方法
if (!tryAcquire(arg) &&
//锁获取失败并且添加该线程到等待队列中
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//中断当前线程
selfInterrupt();
}
逐步看看上面代码中的几个方法
tryAcquire()走到获取非公平锁:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//获取当前状态
int c = getState();
if (c == 0) {
// 活跃状态,再次尝试获取锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//判断当前线程是否是占用锁的线程
else if (current == getExclusiveOwnerThread()) {
//是当前持有锁的线程,计数加1
//TODO 这里我推测是可重入锁计数的实现,后面去验证
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
添加当前线程到同步队列
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 将尾节点设置为当前新节点的前继节点
Node pred = tail;
if (pred != null) {
node.prev = pred;
//CAS设置当前节点为tail
if (compareAndSetTail(pred, node)) {
//将当前节点设置为上一节点的下一节点,有点绕
pred.next = node;
return node;
}
}
//尾节点是null
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//尾节点是null,初始化头尾节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
//将node 设置为tail,设置前后节点的prev和next
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued():
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取当前节点的前置节点
final Node p = node.predecessor();
//如果前置节点是头并且能重新获取到锁,应该是防止入队列时头结点被释放
if (p == head && tryAcquire(arg)) {
//设置当前节点为头
setHead(node);
p.next = null; // help GC
failed = false;
//返回中断失败
return interrupted;
}
//如果前置节点不是head,也未获取到锁,立即执行中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
接下来分析unlock()方法:
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
//尝试释放
if (tryRelease(arg)) {
Node h = head;
//head不是null,不是活跃状态
if (h != null && h.waitStatus != 0)
//释放锁成功
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
//计数减1
int c = getState() - releases;
//当前线程是否是持有锁的线程,不是则抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
//没有线程持有锁
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
而公平锁获取比非公平锁多了一个判断
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//此处增加了判断,是否有前驱节点在等待
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//判断是否有前驱节点在等待
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
三、Condition条件等待与通知
java的Object类型实现线程等待与通知: 应用Object的wait(),wait(long timeout),wait(long timeout, int nanos)与notify(),notifyAll()。整体上看是通过对象监视器配合完成线程间的等待/通知机制。
Condition与Lock配合完成等待通知机制:针对Object类型的等待与通知,Condition也提供了对应的方式。
针对Object的wait(),wait(long timeout),wait(long timeout, int nanos),Condition提供了以下几个方法:
void await() throws InterruptedException:当前线程进入等待状态,如果其他线程调用condition的signal或者signalAll方法并且当前线程获取Lock从await方法返回,如果在等待状态中被中断会抛出被中断异常;
long awaitNanos(long nanosTimeout):当前线程进入等待状态直到被通知,中断或者超时;
boolean await(long time, TimeUnit unit)throws InterruptedException:当前线程进入等待状态直到被通知,支持自定义时间单位
boolean awaitUntil(Date deadline) throws InterruptedException:当前线程进入等待状态直到被通知,中断或者到了某个指定时间
还额外提供个
void awaitUninterruptibly(); 与await()区别在于,使用await方法,调用interrupt()中断后会报错,而该方法不会报错。
针对Object的notify(),notifyAll(),Condition提供了以下几个方法:
void signal():唤醒一个等待在condition上的线程,将该线程从等待队列中转移到同步队列中,如果在同步队列中能够竞争到Lock则可以从等待方法中返回。
void signalAll():够唤醒所有等待在condition上的线程,将全部线程从等待队列中转移到同步队列中,如果在同步队列中能够竞争到Lock则可以从等待方法中返回。
以上锁的方式实际是在AQS中实现的,源码请看上一章节的AQS分析。
Condition与Object方式的不同:
Condition能够支持不响应中断,而通过使用Object方式不支持;
Condition能够支持多个等待队列(new 多个Condition对象),而Object方式只能支持一个;
Condition能够支持超时时间的设置,而Object不支持
Condition结合ReentrantLock的使用:
/**
* 库存
*/
static class Inventory {
//初始化ReentrantLock实例
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
//库存数量
private int num = 100;
//增加库存
public void add(int n) throws InterruptedException {
//加锁
lock.lock();
try {
//先等待sub的通知
condition.await();
num += n;
System.out.println("增加库存后的数量=" + num);
} finally {
//释放锁
lock.unlock();
}
}
//减少库存
public void sub(int n) throws InterruptedException {
//加锁
lock.lock();
try {
num -= n;
System.out.println("减少库存后的数量=" + num);
//睡1s,为了看add方法接收通知的效果
Thread.sleep(1000);
condition.signal();
} finally {
//释放锁
lock.unlock();
}
}
}
结果分析:按照代码逻辑,先走增加方法,但是被await方法阻塞了,1s后执行sub方法,减少数量后并sleep1s,使用signal方法通知add方法,最终看到sub先输出,add后输出。
减少库存后的数量=99
增加库存后的数量=100
在代码中看到,condition对象实际是调用lock的new ConditionObject()方法,new了一个ConditionObject对象,ReentrantLock的内部Sync继承了AQS,而ConditionObject是AQS的一个内部类,实现了Condition接口。接口内提供了诸多通信机制的方法,可见ReentrantLock、AQS与Condition的紧密关联。相互关系请见本章节开头的图。
有点结论可以提出一下,了解过lock和synchronized之后,发现两种锁前者是基于jvm内存模型的,后者基于代码实现,不知道同学们有没有相同感受。
四、Latch门闩
首先我们写个例子,来理解下门栓的含义:
public static void main(String[] args) throws InterruptedException {
// 使用倒计数门闩器 ,迫使主线程进入等待 ;设置门栓的值为10
CountDownLatch latch = new CountDownLatch(10);
new Thread(() -> {
for (int i = 0; i < 10; i++) {
//门栓值减1
latch.countDown();
System.out.println("当前门栓值:" + latch.getCount());
}
}).start();
//阻塞主线程,等门栓值为0,主线程执行
latch.await();
System.out.println("主线程执行。。。");
}
结果:从以下结果可以看到,当门栓的值降到0之后,主线程执行了。
当前门栓值:9
当前门栓值:8
当前门栓值:7
当前门栓值:6
当前门栓值:5
当前门栓值:4
当前门栓值:3
当前门栓值:2
当前门栓值:1
当前门栓值:0
主线程执行。。。
接下来我们分析下原理,其中有个内部类Sync,同样继承了AQS
private static final class Sync extends AbstractQueuedSynchronizer
结合上面的例子逐步分析源码,首先初始化了一个CountDownLatch对象:
// 使用倒计数门闩器 ,迫使主线程进入等待 ;设置门栓的值为10
CountDownLatch latch = new CountDownLatch(10);
//构造
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
//同步代码块
Sync(int count) {
//设置AQS的state计数
setState(count);
}
用await阻塞主线程:
public void await() throws InterruptedException {
//AQS的获取中断共享锁
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//获取当前值是多少
if (tryAcquireShared(arg) < 0)
//获取共享锁
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//填加获取共享锁类型到同步队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//获取前驱节点
final Node p = node.predecessor();
if (p == head) {
//前驱节点等于head,尝试获取共享锁,就是获取state的值
int r = tryAcquireShared(arg);
if (r >= 0) {
//获取共享锁成功,设置当前节点为head,释放原head共享锁
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//阻塞和中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
countDown()减数量,释放共享锁
public void countDown() {
//释放共享锁
sync.releaseShared(1);
}
//AQS释放共享锁
public final boolean releaseShared(int arg) {
//获取state并减1
if (tryReleaseShared(arg)) {
//无线循环并通过CAS释放所有共享锁
doReleaseShared();
return true;
}
return false;
}
五、CyclicBarrier线程栅栏
先看一个使用例子
public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
CyclicBarrier cyclicBarrier = new CyclicBarrier(6);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "准备就绪");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + "到达");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "准备开始");
cyclicBarrier.await();
}
结果:5个线程和main函数进行await,当总数达到6后,开始执行。是不是很简单。
Thread-1准备就绪
Thread-4准备就绪
Thread-0准备就绪
Thread-2准备就绪
Thread-3准备就绪
main准备开始
Thread-1到达
Thread-0到达
Thread-3到达
Thread-2到达
Thread-4到达
看看源码实现:
//构造函数,parties为线程数量
public CyclicBarrier(int parties) {
this(parties, null);
}
//Runnable 参数,这个参数的意思是最后一个到达线程要做的任务
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
//阻塞方法
public int await() throws InterruptedException, BrokenBarrierException {
try {
//引入了Condition等待队列,使用await()方法与signalAll()方法,通过counnt计数
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
六、Semaphore信号量
Semaphore 通常我们叫它信号量, 可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。
官方解释是Semaphore用于限制可以访问某些资源(物理或逻辑的)的线程数目,他维护了一个许可证集合,有多少资源需要限制就维护多少许可证集合,假如这里有N个资源,那就对应于N个许可证,同一时刻也只能有N个线程访问。一个线程获取许可证就调用acquire方法,用完了释放资源就调用release方法。
举个例子:
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println("线程" + Thread.currentThread().getName() + "占用时间:" + LocalDateTime.now());
Thread.sleep(2000);
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
结果:每次只通过两个线程,等待两秒。
线程Thread-0占用时间:2020-08-24T09:45:31.738
线程Thread-1占用时间:2020-08-24T09:45:31.738
线程Thread-2占用时间:2020-08-24T09:45:33.740
线程Thread-3占用时间:2020-08-24T09:45:33.740
线程Thread-4占用时间:2020-08-24T09:45:35.740
线程Thread-5占用时间:2020-08-24T09:45:35.740
线程Thread-6占用时间:2020-08-24T09:45:37.741
线程Thread-7占用时间:2020-08-24T09:45:37.741
线程Thread-8占用时间:2020-08-24T09:45:39.741
线程Thread-9占用时间:2020-08-24T09:45:39.742
针对上面的例子,我们看下具体的实现原理:
实现了三个内部类,与ReentrantLock是相同的,Syn继承的AQS,公平锁与非公平锁分别继承Sync实现同步。
初始化方法:默认非公平锁,同时定义下通行证的数量。将通行证数量设置到AQS的state。
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
protected final void setState(int newState) {
state = newState;
}
获取锁方法:semaphore.acquire();
public void acquire() throws InterruptedException {
//获取共享可中断锁
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//尝试获取共享锁,小于0,则表示当前通行证不足
if (tryAcquireShared(arg) < 0)
//通行证数量不足,创建阻塞队列
doAcquireSharedInterruptibly(arg);
}
跟踪tryAcquireShared(arg)到底层:
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
//获取通行证数量
int available = getState();
//减去需要或取得数量
int remaining = available - acquires;
//获取后数量小于0,直接返回获取后数量,大于0,CAS设置state
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
跟踪doAcquireSharedInterruptibly(int arg)方法
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//添加共享锁节点到同步队列的尾部
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//获得当前节点pre节点
final Node p = node.predecessor();
if (p == head) {
//再次尝试获取共享锁
int r = tryAcquireShared(arg);
if (r >= 0) {
//获取共享锁成功,设置当前节点为head,释放原head共享锁
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//重组双向链表,清空无效节点,挂起当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
释放锁方法semaphore.release(),跟踪到底层
public final boolean releaseShared(int arg) {
//释放锁
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
//获取当前状态
int current = getState();
//加上要释放的值得到最新的值
int next = current + releases;
//加完后小于当前值,【】抛出异常
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//CAS设置state
if (compareAndSetState(current, next))
return true;
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//是否需要唤醒后继节点
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒h.nex节点线程
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
七、Semaphore与Lock的区别(高频面试)
最主要的区别在于,Semaphore可以进行死锁恢复。
我们看下Lock的释放锁源码,以ReentrantLock为例。如果当前线程不是持有锁的线程,则抛出IllegalMonitorStateException异常,也就是说,Lock在unlock前,必须先lock,持有锁。
protected final boolean tryRelease(int releases) {
//计数减1
int c = getState() - releases;
//当前线程是否是持有锁的线程,不是则抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
//没有线程持有锁
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
而Semaphore则没有这个判断,会直接将设置state的值,增加通行证的数量。分别举两个例子看下。
public static void main(String[] args) {
Lock lock = new ReentrantLock();
//Semaphore semaphore = new Semaphore(1);
new Thread(()->{
lock.unlock();
}).start();
}
结果抛出异常:
Exception in thread "Thread-0" java.lang.IllegalMonitorStateException
at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:151)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1261)
at java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:457)
at com.cloud.bssp.thread.SemaphoreAndLock.lambda$main$0(SemaphoreAndLock.java:26)
at java.lang.Thread.run(Thread.java:748)
下面来看下semaphore的例子
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(1);
System.out.println("当前通行证数量:" + semaphore.availablePermits());
new Thread(()->{
semaphore.release();
}).start();
Thread.sleep(1000);
System.out.println("当前通行证数量:" + semaphore.availablePermits());
}
结果:发现在release之后,数量增加的一个。我们可以利用这个特性去做死锁恢复。
简单模仿下死锁恢复的例子,两个线程一个先占用semaphore1,一个先占用semaphore2,分别sleep5秒,这时候没有释放,在去占用另外一个,发现产生了死锁,线程卡在这里不动了。main方法主线程会在10秒后去判断是否释放锁,没有的话由主线程去释放,这时候发现两个线程分别获取到了锁。
/**
* 死锁恢复
*/
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore1 = new Semaphore(1);
Semaphore semaphore2 = new Semaphore(1);
new Thread(() -> {
try {
semaphore1.acquire();
System.out.println("线程" + Thread.currentThread().getName() + "获取semaphore1");
Thread.sleep(5000);
semaphore2.acquire();
System.out.println("线程" + Thread.currentThread().getName() + "获取semaphore2");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
semaphore2.acquire();
System.out.println("线程" + Thread.currentThread().getName() + "获取semaphore2");
Thread.sleep(5000);
semaphore1.acquire();
System.out.println("线程" + Thread.currentThread().getName() + "获取semaphore1");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
Thread.sleep(10000);
//主线程等待十秒,判断两个线程是否执行完毕,是否释放锁
if (semaphore1.availablePermits() != 1) {
System.out.println("发生死锁了,释放semaphore1");
semaphore1.release();
}
if (semaphore2.availablePermits() != 1) {
System.out.println("发生死锁了,释放semaphore2");
semaphore2.release();
}
}
结果:
线程Thread-0获取semaphore1
线程Thread-1获取semaphore2
发生死锁了,释放semaphore1
发生死锁了,释放semaphore2
线程Thread-1获取semaphore1
线程Thread-0获取semaphore2
八、ThreadLocal线程本地变量(高频面试)
顾名思义,ThreadLocal可以理解为线程本地变量,当创建了ThreadLocal变量,那么线程对于ThreadLocal的读取就是相互隔离的,不会产生影响。
8.1 使用实例
先抛个实际使用的例子扔在这,10个线程分别对ThreadLocal进行加1,最终结果都是101,每个线程修改了各自的本地变量。如果是int类型的,结果应该为110,体现了线程本地变量的特性。
/**
* 库存
*/
static class Inventory {
private ThreadLocal<Integer> num = ThreadLocal.withInitial(() -> 100);
//增加库存
public synchronized void add(int n, String threadName) {
//增加库存
num.set(num.get() + n);
System.out.println("线程:" + threadName + ",增加库存后的数量=" + num.get());
}
}
public static void main(String[] args) {
Inventory inventory = new Inventory();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
inventory.add(1, Thread.currentThread().getName());
}).start();
}
}
结果:
线程:Thread-0,增加库存后的数量=101
线程:Thread-2,增加库存后的数量=101
线程:Thread-1,增加库存后的数量=101
线程:Thread-3,增加库存后的数量=101
线程:Thread-7,增加库存后的数量=101
线程:Thread-9,增加库存后的数量=101
线程:Thread-8,增加库存后的数量=101
线程:Thread-6,增加库存后的数量=101
线程:Thread-4,增加库存后的数量=101
线程:Thread-5,增加库存后的数量=101
8.2 源码解读
我很难写出比这篇文章更好的了,所以直接上连接了,不在写了,这篇文章绝对是当前百度能找到最详细的了。
https://www.cnblogs.com/micrari/p/6790229.html
九、Phaser 线程阶段器(本文只介绍简单使用)
在jdk1.7中被引入,能够完成多阶段的任务,并且每个阶段可以多线程并发执行,但是需要当前阶段全部完成才能进入下一阶段,相比于CyclicBarrier或者CountryDownLatch,功能更加强大和灵活。
用法
/**
* 线程数,即学生数量
*/
private static int PARTIES = 5;
static Phaser p = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
System.out.println("第一题完成");
return false;
case 1:
System.out.println("第二题完成");
return false;
case 2:
System.out.println("第三题完成");
return false;
default:
return true;
}
}
};
private static void firstQuestion() {
System.out.println("线程:" + Thread.currentThread().getName() + ",第一题");
p.arriveAndAwaitAdvance();
}
private static void secondQuestion() {
System.out.println("线程:" + Thread.currentThread().getName() + ",第二题");
p.arriveAndAwaitAdvance();
}
private static void thirdQuestion() {
System.out.println("线程:" + Thread.currentThread().getName() + ",第三题");
p.arriveAndAwaitAdvance();
}
public static void main(String[] args) {
for (int i = 0; i < PARTIES; i++) {
new Thread(() -> {
//线程注册
p.register();
firstQuestion();
secondQuestion();
thirdQuestion();
}).start();
}
}
结果:五个线程分阶段完成了每个题目
线程:Thread-1,第一题
线程:Thread-3,第一题
线程:Thread-2,第一题
线程:Thread-0,第一题
线程:Thread-4,第一题
第一题完成
线程:Thread-4,第二题
线程:Thread-3,第二题
线程:Thread-2,第二题
线程:Thread-1,第二题
线程:Thread-0,第二题
第二题完成
线程:Thread-0,第三题
线程:Thread-4,第三题
线程:Thread-1,第三题
线程:Thread-3,第三题
线程:Thread-2,第三题
第三题完成
十、Exchanger 线程数据交换器 (本文只介绍简单使用)
Exchanger 是 JDK 1.5 开始提供的一个用于两个工作线程之间交换数据的封装工具类,当第一个线程调用了exchange()方法后,当前线程会进入阻塞状态,直到第二个线程也执行了exchange()方法,交换数据,继续执行。
使用实例
/**
* 初始化string类型的Exchanger
*/
static Exchanger<String> exchanger = new Exchanger<>();
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
String flag1 = "111";
System.out.println(Thread.currentThread().getName() + "交换前flag1=" + flag1);
try {
//交换数据,并进入阻塞
flag1 = exchanger.exchange(flag1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "交换后flag1=" + flag1);
}).start();
Thread.sleep(1000);
new Thread(() -> {
String flag2 = "222";
System.out.println(Thread.currentThread().getName()+ "交换后flag2=" + flag2);
try {
//交换数据,唤醒上一个线程
flag2 = exchanger.exchange(flag2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+ "交换前flag2=" + flag2);
}).start();
}
结果:在交换过后,flag1和flag2的值发生了互换。
Thread-0交换前flag1=111
Thread-1交换后flag2=222
Thread-1交换前flag2=111
Thread-0交换后flag1=222