LongAdder简单介绍
在原子操作类AtomicLong中,在高并发的情况,会出现大量的线程去争抢更新同一个原子变量,但是同时只能有一个线程CAS操作成功,这就会出现大量线程争抢失败,然后通过自旋进行重试,这就会消耗CPU的资源;这个时候就出现了LongAdder原子操作类;它主要就是将这里的原子变量复制成多个,然后多个变量由线程去争,结束的时候,再进行多个变量值的计算;
先了解一下LongAdder的结构从上图可以看出LongAdder是继承了Striped64这个类;其中定义了一个Cell类型的数组,Cell的具体结构如下:
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
1、该类使用了@sun.misc.Contended注解,是为了解决伪共享问题; 2、该类内部定义了一个volatile修饰的long类型的value变量; 3、该类内部定义了一个cas(long cmp, long val)方法; 4、该类内部通过sun.misc.Unsafe类获取该类中value变量的偏移量;
因为每个Cell内部都维护了一个变量,所以在高并发情况下,不同的线程,会访问不同的Cell内部的long类型变量,而且当某一个线程在变更某一个Cell中的变量时,并不会在此Cell上进行自旋重试,而是尝试在其他的Cell变量上重试;最后将所有的value变量值sum后再加上base返回;
接下来简单了解一下LongAdder中比较重要的两方法add(long x)和longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended)
- void add(long x)
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
//(1)
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
//(2)
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
(1) 中判断cells数组是否为null,CAS操作base的值是否成功;一般的在并发不高的情况下,都是通过CAS操作直接修改base的值来完成;在并发高的情况下,casBase(b = base, b + x)方法可能会出现失败,这个时候就会使用cells数组;
(2) 中一共有三个判断
情况1、as == null || (m = as.length - 1) < 0; 这里的as是对cells的引用,如果成立表示此时cells数据还未进行初始化;这里的m是cells数组长度-1,如果m<0成立,证明此时cells数组的长度小于1,则可以理解此时cells数据还未初始化,然后进入到了longAccumulate方法中;
情况2、(a = as[getProbe() & m]) == null; 这里的getProbe()方法是获取当前线程的hash值,a的值是当前线程的hash值在cells数组中对应位置的值也就是Cell,如果为null,则证明当前线程还未对cells数组中的元素进行操作,然后进入到了longAccumulate方法中;
static final int getProbe() { return UNSAFE.getInt(Thread.currentThread(), PROBE); }
情况3、!(uncontended = a.cas(v = a.value, v + x)); 这里的a为当前线程对应的Cell, 然后对应的CAS操作当前的Cell中的value值,如果不成功,则进入到longAccumulate方法中
- void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended)
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
//(3)
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
//(4)
if ((as = cells) != null && (n = as.length) > 0) {
//(4.1)
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
//(4.2)
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//(4.3)
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
//(4.4)
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
//(4.5)
else if (!collide)
collide = true;
//(4.6)
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
//(5)
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//(6)
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
该方法结合(2)中的三种情况来分析
情况1进入:此时cells数组还未初始化,执行到(3)的时候h = getProbe()为当前线程的hash值,因为当前线程还未对threadLocalRandomProbe值做任何操作,所以它肯定是0;所以(h = getProbe()) == 0;成立! 然后调用ThreadLocalRandom.current();
public static ThreadLocalRandom current() { if (UNSAFE.getInt(Thread.currentThread(), PROBE) == 0) localInit(); return instance; }
static final void localInit() { int p = probeGenerator.addAndGet(PROBE_INCREMENT); int probe = (p == 0) ? 1 : p; // skip 0 long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT)); Thread t = Thread.currentThread(); UNSAFE.putLong(t, SEED, seed); UNSAFE.putInt(t, PROBE, probe); }
到这里才对threadLocalRandomProbe进行了操作,然后在h = getProbe();获取最新的值; 这个时候因为as == cells == null,所以直接到(5);在(5)中cellsBusy是一个自旋锁,当等于0时,证明是无锁状态,可以获取锁;cells == as判断当前的cells是不是没有变动;然后再通过casCellsBusy()方法进行加锁,将cellsBusy赋值为1;在(5)的内部,先初始化一个大小为2的Cell[]数组,然后将通过位于运算将第0个或者第一个1元素的值设置为x的值,然后将这个新的数据赋值给cells数组,这个时候cells就不是空的了;最后释放锁,跳出循环结束;
情况二进入:此时cells数组肯定不为空,但是当前线程在cells中还没有设置过一个Cell元素;这个时候就到了(4)这一步,这个时候执行a = as[(n - 1) & h]==null判断,这里的h是当前线程调用了ThreadLocalRandom.current()之后的hash值,在做一次判断是否为null;
假设(4.1)成立;执行cellsBusy==0获取锁,尝试去new一个Cell对象,并且赋值为x;然后在尝试获取锁,并且通过CAS修改锁的状态,最后再次判断当前cells是否为空,当前线程是否没有对应的Cell元素,然后将新new的元素赋值给当前这个线程在cells数组中的对应位置的元素;最后释放锁结束;
假设(4.1)不成立;证明当前线程有对应的Cell元素,这个时候的wasUncontended=true;所以不会进入(4.2),所以看(4.3),此时的fn是等于null的,所以(4.3)就是a.cas(v = a.value, v + x),也就是将当前线程对应的Cell对象中的原子变量进行CAS操作,如果CAS成功结束循环;
假设(4.3)不成立;在(4.4)中设置collide = false;或者(4.5)中设置collide = true后,执行h = advanceProbe(h);重新设置当前线程的hash值,然后继续下一轮循环;
假设(4.6)成立;这里是是将cells数组进行扩容,然后继续下一轮循环;
情况三进入:此时对应线程是有cell元素的,只是cas失败了;然后进入(4.3)或者(4.6)继续循环;
代码(6)中是直接进行CAS操作base值;