目录
介绍
JUC是并发中非常重要的一个包,接下来针对这个包做一个详细的介绍。
结构如下:
- atomic包
- locks包
- 其他类
可以看到结构非常简单,两个包加一些类。包里面的内容也没多少,这一章节主要讲一下atomic包。
atomic包下主要有Integer、Boolean、Long、Reference(array)等,其每一种又各包含不同功能的类
我们拿Long来介绍,其他不外乎这三种。
AtomicLong
AtomicLong属性
- unsafe:这是一个Unsafe的静态实例,JUC包一系列原子类操作都是基于此
- value:实际变量值,volatile Long类型
- valueOffset: value的偏移地址
- VM_SUPPORTS_LONG_CAS:JVM的支持判断(可以不用管)
原子类操作有一个比较通用的静态代码块,就是加载地址偏移,后面介绍的LongAdder、LongAccumulator也是一样。
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicLong.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
从上面可以看到,属性非常少,所以其原理也很简单,当然因为Long本来就简单,下面直接介绍它的一些方法:
AtomicLong方法
-
第一类是直接操作如:
public final boolean compareAndSet(long expect, long update) { return unsafe.compareAndSwapLong(this, valueOffset, expect, update); } public final boolean weakCompareAndSet(long expect, long update) { return unsafe.compareAndSwapLong(this, valueOffset, expect, update); }
底层逻辑就是CAS
-
设置一个新值,并返回上一个值,有以下:
public final long getAndSet(long newValue) { return unsafe.getAndSetLong(this, valueOffset, newValue); } public final long getAndIncrement() { return unsafe.getAndAddLong(this, valueOffset, 1L); } public final long getAndDecrement() { return unsafe.getAndAddLong(this, valueOffset, -1L); } public final long getAndAdd(long delta) { return unsafe.getAndAddLong(this, valueOffset, delta); } public final long incrementAndGet() { return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L; } public final long getAndUpdate(LongUnaryOperator updateFunction) { long prev, next; do { prev = get(); next = updateFunction.applyAsLong(prev); } while (!compareAndSet(prev, next)); return prev; } 还有其他的方法,不一一列举
上面的方法都可以归为一类,因为底层的实现都是unsafe的方法且操作都是一个while循环进行cas操作,成功了再返回。最后一个看起有地不一样,但如果有看过java8的函数式接口就应该不难理解。
LongUnaryOperator就是一个函数式接口,定义一个函数,然后操作前一个值,得到目标值。因此getAndUpdate可以实现定制化的操作。
AtomicLong实例
计算多个数组中0的个数
/**
* <测试原子操作:参考《Java并发编程之美》
*/
public class AtomicTest
{
private static AtomicLong atomicLong = new AtomicLong();
private static Integer[] arrayOne=new Integer[]{0,1,2,3,0,5,6,0,56,0};
private static Integer[] arrayTwo=new Integer[]{10,1,2,3,0,5,6,0,56,0};
public static void main(String[] args) throws InterruptedException
{
// 线程one统计数组arrayone中0的个数
Thread threadOne=new Thread(()->{
int size=arrayOne.length;
for (int i = 0; i < size; i++)
{
if(arrayOne[i].intValue()==0){
atomicLong.incrementAndGet();
}
}
});
// 线程two统计数组arraytwo中0的个数
Thread threadTwo=new Thread(()->{
int size=arrayTwo.length;
for (int i = 0; i < size; i++)
{
if(arrayTwo[i].intValue()==0){
atomicLong.incrementAndGet();
}
}
});
threadOne.start();
threadTwo.start();
threadOne.join();
threadTwo.join();
System.out.println("count 0 : "+atomicLong.get());
}
}
输出: count 0 : 7
LongAdder
到这里AtomicLong原子类操作已经比阻塞同步器来说已经好很多了,但是jdk开发组仍不满足,我们也看到,在第二种方法时我们进行了一个while循环来自旋操作,当多个线程竞争同一个变量时,会造成CPU资源浪费。
LongAdder就是被设计来解决多个线程竞争同一个变量的问题,它把一个变量拆分成了多个变量,使用Cell自动扩容数组来存储部分数据,让多个线程去竞争多个资源,这样就解决了性能瓶颈。
如图:
LongAdder 在内部维护了一个Cell元素数组,来分担单个变量进行争夺的资源占用。
Striped64
LongAdder继承自Striped64,Striped64内部维护以下变量:
- volatile Cell[] cells :LongAdder实际值为数组中所有元素的值+base
- volatile long base : 每次操作首先进行base的CAS如果失败则分配一个Cell
- volatile int cellsBusy :用来实现cells数组的自旋锁,修改的时候锁住,状态值只有0或者1,1表示不允许修改
- static final long PROBE:线程中threadLocalRandomProbe的偏移
- static final sun.misc.Unsafe UNSAFE:和AtomicLong一样
- int NCPU : CPU的个数,Cell不会无限增长,只有个数小于等于CPU的个数才会有性能的提升
除了Striped64维护的几个属性,LongAdder本身只增加了函数,没有增加属性
再看内部类Cell:
@sun.misc.Contended // 此注解使用字节填充防止伪共享
static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
由上面可以看到内部类Cell非常简单,一个value值,一个CAS操作,保证value是原子操作
继续看Striped64的其他几个方法:
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
final boolean casCellsBusy() {
return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}
static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended)//核心方法
除了最后一个longAccumulate函数比较复杂我们后面再讲,其他都非常好理解,就是简单的CAS操作
LongAdder方法
LongAdder方法可以归为三类,一类是返回当前的值或重置,一类是数据加、减,还有一类则是序列化(此章节忽略)
第一类如下:
- public long sum() 这个函数比较简单,就是返回所有Cell的元素和base的和,注意这个计算不是一个近实时的值,因为在计算过程中每个Cell会改变
long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
- void reset() 将base和Cell全都置为0
public void reset() { Cell[] as = cells; Cell a; base = 0L; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) a.value = 0L; } } }
- long sumThenReset() 这个函数是上面两个函数的叠加
- public String toString() {return Long.toString(sum());}
- public long longValue() { return sum();}
- public int intValue() {return (int)sum();}
- 还有floatValue、doubleValue和以上一样
可以看出上面这一类非常容易理解,就是普通数组的操作
重点是第二类:
- public void add(long x)
- public void increment() { add(1L);}
- public void decrement() {add(-1L);}
核心函数add源码:
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
上面函数逻辑不难,就是各种条件判断看着有迷。这里有一张图:
要点:
- 如果cells还没创建,则尝试修改base的值,这部分跟AtomicLong操作一样,不过不同的是,AtomicLong会自旋不断尝试,而这里则会进入longAccumulate操作cells数据
- 如果cells不为空,则尝试对当前线程映射到的Cell元素进行赋值,如果成功则结束,如果不存在或者CAS失败则调用longAccumulate。
- uncontended为true 代表当前线程映射的Cell元素没有值,为false,代表当前线程映射的Cell元素已经有其他线程占用了
接着我们重点研究longAccumulate代码:
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
// 初始化探针值threadLocalRandomProbe,在并发中很多地方用到这个属性,我的理解是这个值并没有实际意义,因为可能线程中多个地方会改变它(比如线程中同时使用了LongAdder、ThreadLocalRandom),它的意义在于遇到线程竞争的时候初始化或重置一个值。
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {// 熟悉的判断,cells存在且有内容
if ((a = as[(n - 1) & h]) == null) {// 如果当前线程映射到此的Cell不存在①
if (cellsBusy == 0) { // cells的锁能修改,则尝试创建一个Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break; // 如果创建成功则跳出循环
continue; // ②不成功代表cells正在被修改比如初始化、扩容、或者新增一个Cell
}
}
collide = false;
}
// 当前线程映射到此的Cell存在 ③
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 执行CAS fn.applyAsLong(v, x)这个函数后面会讲,成功就退出循环,失败继续往下
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// 判断Cell的个数是否达到CPU的个数,如果达到,就不能再扩容,继续尝试CAS
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
// 如果没达到,则锁住cells进行复制扩容④
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h); // 进行到这,说明存在对应的Cell,但有冲突,因此进行扩容并重新生成threadLocalRandomProbe⑤
}
// 拿到cells锁并初始化cells⑥
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
// 尝试CAS base。
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
这一整段代码下来还是挺复杂的,主要是比较多的分支,和casCellsBusy判断,下面总结一下要点:
- 如果当前线程映射到此的Cell存在,但如果有冲突,也就是别的线程也映射到此了,则进行扩容和修改线程的探针值(上限是CPU的数量),否则不断尝试CAS如③、④、⑤;
- 如果当前线程映射到此的Cell不存在,则尝试创建一个新的Cell并设置值如①
- 如果连cells都为空,则进行初始化如⑥
- 如果以上都失败,也就是操作被占用,则尝试对casBase
个人想法:
- 代码⑥处,个人感觉cells==as是否有点多余,因为后面拿到锁后还会判断
- 代码②处,在扩容、初始化时利用cellsBusy锁住整个cells能理解,但是创建一个新Cell插入进去,也锁住了整个cells是否能够继续优化?
LongAdder实例
和上面的使用方式差不多
public class LongAdderTest
{
private static LongAdder LongAdder = new LongAdder();
private static Integer[] arrayOne=new Integer[]{0,1,2,3,0,5,6,0,56,0};
private static Integer[] arrayTwo=new Integer[]{10,1,2,3,0,5,6,0,56,0};
public static void main(String[] args) throws InterruptedException
{
// 线程one统计数组arrayone中0的个数
Thread threadOne=new Thread(()->{
int size=arrayOne.length;
for (int i = 0; i < size; i++)
{
if(arrayOne[i].intValue()==0){
LongAdder.increment();
}
}
});
// 线程two统计数组arraytwo中0的个数
Thread threadTwo=new Thread(()->{
int size=arrayTwo.length;
for (int i = 0; i < size; i++)
{
if(arrayTwo[i].intValue()==0){
LongAdder.increment();
}
}
});
threadOne.start();
threadTwo.start();
threadOne.join();
threadTwo.join();
System.out.println("count 0 : "+LongAdder.sum());
}
}
LongAccumulator
LongAccumulator其实和LongAdder很相似,可以说LongAdder只是LongAccumulator一个操作加减的特例。
构造函数如下:
public LongAccumulator(LongBinaryOperator accumulatorFunction,
long identity) {
this.function = accumulatorFunction;
base = this.identity = identity;
}
发现它是可以定制方法的而且是有初始值的,我们只需传入一个方法(函数式接口)原理如下:
public void accumulate(long x) {
Cell[] as; long b, v, r; int m; Cell a;
if ((as = cells) != null ||
(r = function.applyAsLong(b = base, x)) != b && !casBase(b, r)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended =
(r = function.applyAsLong(v = a.value, x)) == v ||
a.cas(v, r)))
longAccumulate(x, function, uncontended);
}
}
与LongAdder的add()函数相比,它只有function不一样,LongAdder里是null,而这里使我们构造的时候传入的。
再回到longAccumulate 代码中,
else if (a.cas(v = a.value, ((fn == null) ? v + x :fn.applyAsLong(v, x))))
break;
真正更新的值不再是v+x 而是使用我们定义的函数进行计算。