前言
保护共享资源--加锁实现
public class TestAccount {
public static void main(String[] args) {
AccountUnsafe account = new AccountUnsafe(10000);
Account.demo(account);
}
}
class AccountUnsafe implements Account{
private Integer balance;
public AccountUnsafe(Integer balance) {
this.balance = balance;
}
@Override
public Integer getBalance() {
synchronized (this){
return balance;
}
}
@Override
public void withDraw(Integer account) {
synchronized (this){
balance -= account;
}
}
}
interface Account{
//获取余额
Integer getBalance();
//取款
void withDraw(Integer account);
/**
* 方法内会启动1000个线程,每个线程做 -10 操作
* 如果初始余额为10000,那么正确结果应当是0
* @param account
*/
static void demo(Account account){
List<Thread> ts = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
ts.add(new Thread(()->{
account.withDraw(10);
}));
}
long start = System.nanoTime();
ts.forEach(Thread :: start);
ts.forEach(t ->{
try{
t.join();
}catch (InterruptedException e){
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(account.getBalance() + " cost : " + (end - start)/1000000 + "ms");
}
}
执行结果:
0 cost : 333ms
保护共享资源--无锁实现
public class TestAccount {
public static void main(String[] args) {
AccountUnsafe account = new AccountUnsafe(10000);
Account.demo(account);
AccountCas accountCas= new AccountCas(10000);
Account.demo(accountCas);
}
}
class AccountCas implements Account{
private AtomicInteger balance;
public AccountCas(int balance) {
this.balance = new AtomicInteger(balance);
}
@Override
public Integer getBalance() {
return balance.get();
}
@Override
public void withDraw(Integer account) {
while (true){
//获取余额的最新值
int prev = balance.get();
int next = prev - account;
//真正修改
if(balance.compareAndSet(prev,next)){
break;
}
}
}
}
class AccountUnsafe implements Account{
private Integer balance;
public AccountUnsafe(Integer balance) {
this.balance = balance;
}
@Override
public Integer getBalance() {
synchronized (this){
return balance;
}
}
@Override
public void withDraw(Integer account) {
synchronized (this){
balance -= account;
}
}
}
interface Account{
//获取余额
Integer getBalance();
//取款
void withDraw(Integer account);
/**
* 方法内会启动1000个线程,每个线程做 -10 操作
* 如果初始余额为10000,那么正确结果应当是0
* @param account
*/
static void demo(Account account){
List<Thread> ts = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
ts.add(new Thread(()->{
account.withDraw(10);
}));
}
long start = System.nanoTime();
ts.forEach(Thread :: start);
ts.forEach(t ->{
try{
t.join();
}catch (InterruptedException e){
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(account.getBalance() + " cost : " + (end - start)/1000000 + "ms");
}
}
可以看到使用原子类的效率更高
执行结果:
0 cost : 341ms
0 cost : 228ms
1. CAS是什么?
CAS全称为 compare and swap,或compare and exchange.
1.1 CAS过程
CAS操作涉及三个操作数:内存值V、预期值A、新值B。
同步方式: 读取V值作为预期值A,执行多步计算来获得新值 B,如果内存值V 与预期值A相等,则表示这段时间没有别的线程去修改内存值V,然后将内存值V 的值更新为 B值。若内存值V的值与预期值A不相等,则说明有其他线程修改了该处的值,此时该线程继续读取V值,循环该过程。
-
什么是ABA问题?
其他线程修改了数次V值,修改之后的值和原值相等。此时当前线程无法正确判断这个对象是否被修改过。 -
如何解决ABA问题?
给原值V添加一个版本号,每次修改之后,更新版本号。
1.2 底层实现
1.2.1 java.util.concurrent.atomic
jdk1.5提供了一组原子类,由CAS对其实现。
- AtomicBoolean,AtomicInteger,AtomicLong 基本类型
- AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray 数组类型
- AtomicReference,AtomicStampedReference,AtomicMarkableReference
AtomicReference为普通的引用类型原子类
AtomicStampedReference在构造方法中加入了stamp(类似时间戳)作为标识,采用自增int作为stamp,在stamp不重复的前提下可以解决ABA问题,AtomicStampedReference可以获知引用被更改了几次。
当我们不需要知道引用被更改几次仅需要知道引用是否被更改过,则可以使用AtomicMarkableReference,这个类用boolean变量表示变量是否被更改过。 - AtomicIntegerFieldUpdater,AtomicLongFieldUpdater,AtomicReferenceFieldUpdater用于对普通类进行原子更新。
其作用为对单一数据的操作实现原子化,无需阻塞代码,但访问两个或两个以上的atomic变量或对单个atomic变量进行2次或2次以上的操作被认为是需要同步的以便这些操作是一个原子操作。 - 从 AtomicInteger 入手,其中的属性 valueOffset 是该对象的 value 在内存中的起始地址。
2. CAS与volatile
2.1 CAS分析
前面看到的tomicInteger 的解决方法,内部并没有用锁来保护共享变量的线程安全。那么它是如何实现的呢?
public void withDraw(Integer account) {
while (true){
//获取余额的最新值
int prev = balance.get();
int next = prev - account;
//真正修改,比较并设置
if (balance.compareAndSet(prev, next)) {
break;
}
}
}
其中关键是compareAndSet,它的简称就是CAS,它必须是原子操作。
- 注意:
其实CAS的底层是 lock cmpchg 指令(X86结构),在单核CPU和多核CPU下都能够保证【比较-交换】的原子性 - 在多核状态下,某个核执行到带lock的指令时,CPU会让总线锁住,当这个核把此指令执行完成时,再开启总线。这个过程中不会被线程的调度机制所打断,保证了多个线程对内存操作的准确性,是原子的。
2.2 volatile
获取共享变量时,为了保证该变量的可见性,需要使用volatile修饰。
它可以用来修饰成员变量和静态成员变量,它可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作volatile变量都是直接操作主存。即一个线程对volatile变量的修改,对另一个线程可见
- 注意:
volatile 仅仅保证了共享变量的可见性,让其他线程能够看到最新值,但不能解决指令交错的问题。(不保证原子性)
2.3 为什么无锁效率高?
- 无锁情况下,即时重试失败,线程始终在高速运行,没有停歇,而synchronized 会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。打个比喻
- 线程就好像高速跑道上的赛车,高速运行时,速度超快,一旦发生上下文切换,就好比赛车要减速、熄火,等被唤醒又得重新打火,启动,加速。。。恢复到高速运行,代价比较大。
- 但无锁情况下,因为线程要保持运行,需要额外CPU支持,CPU在这里就好比高速跑道,没有额外的跑道,线程想高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还是会导致上下文切换。
- 多核CPU才能发挥优势
2.4 CAS特点
结合CAS和volatile 可以实现无锁并发,适用于线程数少、多核CPU的场景下。
- CAS是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再重试。
- synchronized 是基于悲观锁的思想:最悲观的估计,得防着其他线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会
- CAS体现的是无锁并发、无阻塞并发,请仔细体会这两句话的意思
- 因为没有使用synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
- 但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受到影响。
3. 原子整数
JUC并发包提供了:
- AtomicBoolean
- AtomicInteger
- AtomicLong
以AtomicInteger为例
AtomicInteger i = new AtomicInteger(0);
//获取并自增(i= 0,结果 i = 1, 返回0),类似于i++
System.out.println(i.getAndIncrement());
//自增并获取(i = 1,结果 i = 2, 返回2),类似于 ++i
System.out.println(i.incrementAndGet());
//自减并获取(i = 2,结果i = 1,返回1),类似于 --i;
System.out.println(i.decrementAndGet());
//获取并自减(i = 1,结果i = 0,返回1),类似于 i--;
System.out.println(i.getAndDecrement());
//获取并加值(i = 0,结果 i = 5,返回0)
System.out.println(i.getAndAdd(5));
//加值并获取(i = 5,结果 i= 10,返回10)
System.out.println(i.addAndGet(5));
System.out.println(i.updateAndGet(x -> x * 5));
System.out.println(i.updateAndGet(x -> x - 5));
运算结果:
0
2
1
1
0
10
50
45
4. 原子引用
为什么需要原子引用类型?
- AtomicReference
- AtomicMarkableReference
- AtomicStampedReference
有如下方法:
public class ABATest {
static AtomicReference<String> ref = new AtomicReference<>("A");
public static void main(String[] args) throws InterruptedException{
System.out.println("start");
String prev = ref.get();
Thread.sleep(1000);
System.out.println("end :" + ref.compareAndSet(prev,"C"));
System.out.println(ref.get());
}
}
4.1 ABA问题以及解决
ABA问题
public class ABATest {
static AtomicReference<String> ref = new AtomicReference<>("A");
public static void main(String[] args) throws InterruptedException{
System.out.println("start");
//这个共享变量被其他线程修改过?
String prev = ref.get();
other();
Thread.sleep(1000);
System.out.println("change A->C :" + ref.compareAndSet(prev,"C"));
System.out.println(ref.get());
}
private static void other(){
new Thread(()->{
System.out.println("change A->B :" + ref.compareAndSet(ref.get(),"B"));
}).start();
new Thread(()->{
System.out.println("change B->A :" + ref.compareAndSet(ref.get(),"A"));
}).start();
}
}
执行结果:
start
change A->B :true
change B->A :true
change A->C :true
C
主线程仅能判断出共享变量的值与最初值A是否相等,不能感知到这种从A改为B又改回A的情况,如果主线程希望:
只要其他线程动过了共享变量,那么自己的cas就算失败,这时,仅比较值是不够的。需要再加一个版本号
AtomicStampedReference
public class ABAAtomicStampedReferenceTest {
static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A",0);
public static void main(String[] args) throws InterruptedException{
System.out.println("start");
String prev = ref.getReference();
int stamp = ref.getStamp();
other();
Thread.sleep(1000);
System.out.println("change A->C :" + ref.compareAndSet(prev,"C",stamp,stamp+1));
System.out.println(ref.getReference());
}
private static void other(){
new Thread(()->{
System.out.println("change A->B :" + ref.compareAndSet(ref.getReference(),"B",ref.getStamp(),ref.getStamp()+1));
}).start();
new Thread(()->{
System.out.println("change B->A :" + ref.compareAndSet(ref.getReference(),"A",ref.getStamp(),ref.getStamp()+1));
}).start();
}
}
执行结果:
start
change A->B :true
change B->A :true
change A->C :false
A
AtomicMarkableReference
AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,如:
A -> B -> A -> C,通过AtomicStampedReference,我们可以知道,引用变量中途被改了几次。
但是有时候,并不关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了AtomicMarkableReference
public class AtomicMarkbleReferenceTest {
public static void main(String[] args) throws InterruptedException{
GarbageBag bag = new GarbageBag("装满了垃圾");
//参数mark可以看做一个标记,true表示垃圾袋装满了
AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag,true);
System.out.println("start");
GarbageBag prev = ref.getReference();
System.out.println(prev.toString());
Thread.sleep(1000);
System.out.println("想换一支新垃圾袋");
boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"),true,false);
System.out.println("换了吗?" +success);
System.out.println(ref.getReference().toString());
}
}
class GarbageBag{
private String description;
public GarbageBag(String description) {
this.description = description;
}
@Override
public String toString() {
return "GarbageBag{" +
"description='" + description + '\'' +
'}';
}
}
执行结果:
start
GarbageBag{description='装满了垃圾'}
想换一支新垃圾袋
换了吗?true
GarbageBag{description='空垃圾袋'}
public class AtomicMarkbleReferenceTest {
public static void main(String[] args) throws InterruptedException{
GarbageBag bag = new GarbageBag("装满了垃圾");
//参数mark可以看做一个标记,true表示垃圾袋装满了
AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag,true);
System.out.println("start");
GarbageBag prev = ref.getReference();
System.out.println(prev.toString());
new Thread(()->{
bag.setDescription("倒空垃圾袋");
boolean success = ref.compareAndSet(bag, bag,true,false);
}).start();
Thread.sleep(1000);
System.out.println("想换一支新垃圾袋");
boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"),true,false);
System.out.println("换了吗?" +success);
System.out.println(ref.getReference().toString());
}
}
class GarbageBag{
private String description;
public GarbageBag(String description) {
this.description = description;
}
public void setDescription(String description) {
this.description = description;
}
@Override
public String toString() {
return "GarbageBag{" +
"description='" + description + '\'' +
'}';
}
}
执行结果:
start
GarbageBag{description='装满了垃圾'}
想换一支新垃圾袋
换了吗?false
GarbageBag{description='倒空垃圾袋'}
5. 原子数组
- AtomicIntegerArray
- AtomicLongArray
- AtomicReferenceArray
有如下方法
public class AtomicArray {
public static void main(String[] args) {
demo(
()->new int[10],
(array)->array.length,
(array,index)->array[index]++,
(array)-> System.out.println(Arrays.toString(array))
);
demo(
()->new AtomicIntegerArray(10),
(array)->array.length(),
(array,index)->array.getAndIncrement(index),
(array)-> System.out.println(array)
);
}
/**
* 参数一:提供数组,可以是线程不安全数组或线程安全数组
* 参数二:获取数组长度的方法
* 参数三:自增方法,回传array,index
* 参数四:打印数组方法
* @param arraySupplier
* @param lengthFunction
* @param printConsumer
* @param <T>
*/
//supplier 提供者,无中生有 ()->结果
//function 函数 一个参数一个结果 (参数)->结果 BiFunction (参数1,参数2)->结果
// consumer 消费者 一个参数没有结果 (参数)->void BiConsumer 两个参数
private static <T> void demo(Supplier<T> arraySupplier, Function<T, Integer> lengthFunction, BiConsumer<T,Integer> putConsumer,
Consumer<T> printConsumer){
List<Thread> ts = new ArrayList<>();
T array = arraySupplier.get();
int length = lengthFunction.apply(array);
for (int i = 0; i < length; i++) {
//每个线程对数组做10000次操作
ts.add(new Thread(()->{
for (int j = 0; j < 10000; j++) {
putConsumer.accept(array,j%length);
}
}));
}
ts.forEach(t->t.start());
ts.forEach(t-> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
printConsumer.accept(array);
}
}
执行结果:
[9632, 9645, 9624, 9611, 9605, 9609, 9620, 9621, 9625, 9625]
[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]
6. 字段更新器
- AtomicReferenceFieldUpdater
- AtomicIntegerFieldUpdater
- AtomicLongFieldUpdater
利用字段更新器,可以针对对象的某个域进行原子操作,只能配合volatile修饰的字段使用,否则会出现异常
Exception in thread "main" java.lang.IllegalArgumentException:Must be volatile type
public class AtomicField {
public static void main(String[] args) {
Student student = new Student();
AtomicReferenceFieldUpdater updater = AtomicReferenceFieldUpdater.newUpdater(Student.class,String.class,"name");
updater.compareAndSet(student,null,"张三");
System.out.println(student);
}
}
class Student{
volatile String name;//加上volatile
@Override
public String toString() {
return "Student{" +
"name='" + name + '\'' +
'}';
}
}
7. 原子累加器
public class AtomicAcc {
public static void main(String[] args) {
demo(
()->new AtomicLong(0),
(adder)->adder.getAndIncrement()
);
demo(
()->new LongAdder(),
(adder)->adder.increment()
);
}
private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
T adder = adderSupplier.get();
List<Thread> ts = new ArrayList<>();
//4个线程,每个累加50万
for (int i = 0; i < 4; i++) {
ts.add(new Thread(() -> {
for (int j = 0; j < 500000; j++) {
action.accept(adder);
}
}));
}
long start = System.nanoTime();
ts.forEach(t -> t.start());
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(adder + " cost : " + (end - start) / 1000000 + "ns");
}
}
执行结果:
2000000 cost : 49ns
2000000 cost : 20ns
性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Thread-0 累加Cell[0],而 Thread-1 累加Cell[1]...最后将结果汇总。这样他们在累加时操作不同的Cell变量,因此减少了CAS重试失败,从而提高性能
7.1 LongAdder 原理分析
源码
LongAdder有几个关键域
/**
* Table of cells. When non-null, size is a power of 2.
*/
transient volatile Cell[] cells; //累加单元数组,懒惰初始化
/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
*/
transient volatile long base; //基础值,如果没有竞争,则用CAS累加这个域
/**
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
*/
transient volatile int cellsBusy; //在cells创建或扩容时,置为1,表示加锁
原理之伪共享
其中Cell即为累加单元
//防止缓存行伪共享
@sun.misc.Contended
static final class Cell {
volatile long value;
Cell(long x) { value = x; }
//最重要的方法,用cas方式进行累加,prev 表示旧值,next表示新值
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
得从缓存说起
缓存与内存的速度比较
从CPU到------------------------大约需要的时钟周期
寄存器 ------------------------ 1cycle(4GHz的CPU约为 0.25ns)
L1 ------------------------- 3~4cycle
L2 --------------------------10~20 cycle
L3 -------------------------- 40~45 cycle
内存 ------------------------- 120~240 cycle
因为CPU 与内存的速度差异很大,需要靠预读数据至缓存来提升效率
而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是64byte(8 个long)
缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同的核心缓存行中
CPU要保证数据的一致性,如果某个CPU核心更改了数据,其他CPU核心对应的整个缓存行必须失效。
8. Unsafe
8.1 概述
Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe对象不能直接调用,只能通过反射获得