Intro
JDK8 在并发工具包下增加了LongAdder、DoubleAdder
类,提供原子的增减功能。本文主要介绍一下LongAdder
,根据Doug Lea
的文档描述,该类在高并发的情况下,吞吐量会比AtomicLong
高很多,当然会牺牲一定的空间。
AtomicLong
JDK8以前JUC下面的原子类都是通过Unsafe类提供CAS的能力来实现的,而Unsafe类是由C来调用硬件级的原子指令实现的。AtmicLong的部分代码如下:
// setup to use Unsafe.compareAndSwapLong for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicLong.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
public final long addAndGet(long delta) {
return unsafe.getAndAddLong(this, valueOffset, delta) + delta;
}
Unsafe类部分代码:
public final long getAndAddLong(Object var1, long var2, long var4) {
long var6;
do {
var6 = this.getLongVolatile(var1, var2);
} while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));
return var6;
}
getLongVolatile
方法是从内存中取到对应的值,然后尝试CAS更新,成功就返回,失败就不断重试直到成功。在高并发的情况下,会造成很大的开销。
LongAdder
先看一下该类的结构;
LongAdder继承自Striped64,其核心功能基本都是Striped64
实现的。介绍一下Striped64
的主要属性
- 基础值
base
- 内部类
Cell
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
@sun.misc.Contended
注解,可以解决伪共享的问题
- 数组
Cells
,数组大小为2的N次方,首次初始化为2 -
NCPU
控制数组的最大为cpu的核数
分析
add方法
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
// 第一次add() 直接调用casBase()。
//成功就结束,否则走到下面逻辑
if ((as = cells) != null || !casBase(b = base, b + x)) {
// 设置竞争标识。true->目前没有竞争
boolean uncontended = true;
// as数组为空或者size为0
// 或者当前线程所分配的Cell为空(as数组大小为2的N次方,所以这里其实就是取模的操作a%b==a&(b-1))
// 或者cas更新Cell失败
//就执行longAccumulate()方法
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
longAccumulate方法
先看一下longAccumulate方法大概逻辑;
- 首先获取当前线程的
probe
如果没有初始化就初始化并设置wasUncontended==true
- 循环中的逻辑有三个大分支
- cells数组内有元素
- 尝试扩容
- 尝试用casBase()计算值
int h;
if ((h = getProbe()) == 0) {
//初始化当前线程的prob
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
// cells数组内有元素
if ((as = cells) != null && (n = as.length) > 0) {
// 当前线程分配到的桶位 值为null 就对其赋值
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
// 这里casCellsBusy()防止了并发的安全问题
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
// 重新确认是否已赋值
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
// false 说明现在又线程在进行初始操作,自旋检查created标识
if (created)
break;
continue;
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// Cell尝试cas操作
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// 扩容到最大容量 或者引用过期
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
// false-> 重新自旋 ps.个人觉得是防止引用过期,然后再重试下
else if (!collide)
collide = true;
// 扩容,扩大两倍
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
// 尝试扩容
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
// 根据奇偶分配
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
// 尝试累加值
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
final boolean casCellsBusy() {
return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}
总结
LongAdder的性能毋容置疑,主要的缺点就是它不能获取自增后的更新值。