参考:https://juejin.im/post/5d4bde1e5188250541791e45
LongAdder是JDK 1.8 新增的原子类,基于Striped64实现。 从官方文档看,LongAdder在高并发的场景下会比AtomicLong 具有更好的性能,代价是消耗更多的内存空间:
This class is usually preferable to AtomicLong when multiple threads update a common sum that is used for purposes such as collecting statistics, not for fine-grained synchronization control. Under low update contention, the two classes have similar characteristics. But under high contention, expected throughput of this class is significantly higher, at the expense of higher space consumption.
Striped64
设计思想:分散热点,将value值分散到一个数组中,不同线程会被计算到数组的不同位置,各个线程只对自己位置中的那个cell进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个cell中的变量值累加。
abstract class Striped64 extends Number {
// cell类,使用cas算法存储值
static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
}
// 系统的CPU数量
static final int NCPU = Runtime.getRuntime().availableProcessors();
// Cell数组
transient volatile Cell[] cells;
// 基准值,当没有线程竞争的时候,使用它
transient volatile long base;
// cellsBusy充当一个自旋锁,默认是0,当进行create cell和resize的时候会将它置1
transient volatile int cellsBusy;
// CAS更新Base
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
// CAS更新 cellsBusy
final boolean casCellsBusy() {
return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}
// 核心方法
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
int h;
// 线程随机值有没有初始化
if ((h = getProbe()) == 0) {
// 0就代表没有初始化,强制将当前线程初始化 probe=1
ThreadLocalRandom.current();
// 默认初始化之后 h = 1
h = getProbe();
wasUncontended = true;
}
// True if last slot nonempty
boolean collide = false;
for (; ; ) {
Cell[] as;
Cell a;
int n;
long v;
if ((as = cells) != null && (n = as.length) > 0) {
// cells不是null 并且已经初始化了
// 找到当前线程的cell应该放的位置
if ((a = as[(n - 1) & h]) == null) {
// index = (length-1) & h
// 位置是null,没有线程占用这个位置,尝试创建一个cell
if (cellsBusy == 0) {
// Optimistically create 对象逃逸
Cell r = new Cell(x);
// 修改 cellsBusy 为 1
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs;
int m, j;
// 将cell放入数组的时候再次检查一遍
// 目的:如果当前线程在new Cell的时候,其他线程初始化了rs[index]的cell,那么这里就需要检查一下
if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
// 如果其他线程已经把位置占了,则会跳过创建,进行下一次循环(扩容hash或者修改cell的值)
} finally {
cellsBusy = 0;
}
if (created){
break;
}
continue; // Slot is now non-empty
}
}
// cellsBusy已经被其他线程修改(其他线程在对cells进行扩容) 修改cellsBusy失败
// 重新hash
collide = false;
} else if (!wasUncontended){ // CAS already known to fail
// CAS失败之后重新hash
wasUncontended = true; // Continue after rehash
}else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))){
// 如果index的cell不是null,则使用cas去修改这个cell的值
// 修改成功退出
break;
}else if (n >= NCPU || cells != as){
// At max size or stale cell数组最大等于CPU数量 或者是 cells已经被其他线程扩容了
// 再次循环 重新hash
collide = false;
}else if (!collide){
// 再次hash
collide = true;
}else if (cellsBusy == 0 && casCellsBusy()) {
// 当前index的cell不是空,并且cas修改cell的值失败了,但是这里表示当前没有线程去操作cells
// 本线程会尝试将 CellsBusy 置 1
try {
// Expand table unless stale
if (cells == as) {
// 将cells扩容 *2
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i){
rs[i] = as[i];
}
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
// 修改当前线程的probe 如果probe=1 -> 计算后为 270369 就等于是一个散列函数
h = advanceProbe(h);
} else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
// cells没有在初始化 + cells==null + 将cellsBusy设置为1
boolean init = false;
// Initialize cells
try {
// 再次检查是null
if (cells == as) {
// 初始化cells 长度是2
Cell[] rs = new Cell[2];
// 默认的h=1 rs[1] = new Cell(x);
rs[h & 1] = new Cell(x);
// 初始化完成
cells = rs;
init = true;
}
} finally {
// 标记置为0
cellsBusy = 0;
}
// 结束循环
if (init){
break;
}
} else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))){
// 如果这时cellsBusy不等于0或者 cells数组已经初始化 那么就会去CAS Base + X 如果失败继续循环,成功就退出
break; // Fall back on using base
}
}
}
}
修改cellsBusy的地方
1、向cells中添加cell时修改cellsBusy
2、扩容的时候修改cellsBusy
3、初始化的时候修改cellsBusy
longAccumulate方法的主要逻辑:
如果在cells数组未初始化并且CAS设置cellsBusy失败,则使用CAS去修改base值,成功则退出,失败则继续循环
如果在cells数组未初始化并且CAS设置cellsBusy成功,则初始化cells数据,初始长度为2,当前创建的cell放在cells[1]位置
如果cells数组初始化完成,根据线程的probe值计算出cell的索引,index=(cells.length -1) & probe。
-
如果索引位置不存在cell,则使用CAS修改cellsBusy,向指定位置添加cell
问题1:如果当前的cellsBusy不为0,进行重新hash,但是probe不会改变
问题2:如果当前线程CAS失败,同样进行重新hash,probe不变
问题3:如果当前线程CAS成功,但是索引位置已经存在cell了,将cellsBusy置为0,重新hash,probe不变
wasUncontended是这个方法传入的参数,如果传入的false表示cas失败,则将它置为true,继续循环
如果index位置的cell不是null,则使用cas去修改这个cell的值,修改成功则退出
如果CAS修改cell的值失败了,并且cells长度等于CPU数量或者cells数组发生了变化,则进入下一次循环
如果CAS修改cell的值失败了,并且cells数组没有发生了变化,这里就会尝试修改cellsBusy,修改成功则开始去*2扩容,扩容完成后修改当前线程的probe,然后再次循环算index
如果CAS修改cellsBusy失败,则修改线程的probe,再次循环
LongAdder
LongAdder如何利用Striped64?
LongAdder中核心调用Striped64的longAccumulate方法。
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
// 1、如果cells为null,则尝试修改base值
if ((as = cells) != null || !casBase(b = base, b + x)) {
// 2、cells不为null或者修改base出现竞争
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x))){
// 修改base值 初始化 填入cell 修改cell 扩容
longAccumulate(x, null, uncontended);
}
}
}
public long sum() {
Cell[] as = cells; Cell a;
// 拿出base
long sum = base;
if (as != null) {
// 把cells数组中的值加起来
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null){
sum += a.value;
}
}
}
return sum;
}
LongAdder相比于AtomicLong的优点
LongAdder在低并发的场景下与AtomicLong的差别不大。在高并发下的某些场景下LongAdder性能要优于AtomicLong
主要在高并发的场景下,LongAdder的吞吐量要优于AtomicLong,但是相应也要耗费更多的空间。
比如:多个线程同时更新一个求和的变量;统计集合的数量,但是不能用于细粒度同步控制(也就是这个统计存在一定的误差,因为读取和 写入是并行的,ConcurrentHashMap中也利用了Striped64的设计思想)
具体测试文档:http://blog.palominolabs.com/2014/02/10/java-8-performance-improvements-longadder-vs-atomiclong/
摘录部分内容:“虽然AtomicLong在单线程的情况下要快一些,但它很快就输给了LongAdder,在两个线程的情况下,它的速度要慢4倍,而在线程与机器核心匹配的情况下,它的速度要慢5倍。更令人印象深刻的是,LongAdder的性能是恒定的,直到线程的数量超过CPU的物理内核的数量(在本例中为4)。”
LongAccumulator
LongAccumulator可以看作是对LongAdder的一个增强。
/**
*
* @param accumulatorFunction 运算接口,传入两个long值,然后返回这两个值的计算结果
* @param identity 累加器的初始值
*/
public LongAccumulator(LongBinaryOperator accumulatorFunction, long identity) {
this.function = accumulatorFunction;
base = this.identity = identity;
}
LongBinaryOperator的作用就是在Striped64#longAccumulate方法中,在对cell进行操作时,如果存在fn,那么就会调用 fn#applyAsLong 来计算x和原来值的结果,默认是将两个值直接相加。
else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))){
// 如果index的cell不是null,则使用cas去修改这个cell的值
// 修改成功退出
break;
}
总结
1、LongAccumulator相比于LongAdder,可以为累加器提供非0的初始值,而LongAdder只能提供默认的0值。
2、LongAccumulator还可以指定算术规则,比如累加或者相乘,只需要在构造LongAccumulator时,传入自定义的运算器即可,LongAdder内置累加算术规则。