扯淡序言
在看ConcurrentHashMap源码对addCount(),fullAddCount()不是很理解。经过查询资料发现实现的逻辑跟LongAddr差不多。LongAddr是什么呢?有什么作用呢?让我们带着疑问一起走进这篇文章。
AtomicLong缺点
我们都知道AtomicLong是通过CAS自旋的方法去设置value,直到成功为止。那么当并发数比较多时,就会导致CAS的失败机率变高,重试次数更多,越多的线程重试,CAS失败的机率越高,形成恶性循环,从而降低了效率。
Striped64类源码解析
简介
Striped64是jdk1.8提供的用于支持如long,double原子性操作的基类。
Striped64的设计核心思路维护一组按需分配的计数单元,并发计数时,不同的线程可以在不同的计数单元上进行计数,这样减少了线程竞争,提高了并发效率。
Striped64内部包含一个base和一个cell数组。整个Striped64的值包括base和cell数组的value相加。
成员变量
* //存放Cell的hash表,大小为2的幂。
transient volatile Cell[] cells;
/**
* 基础值,没有竞争时会使用这个值,同时作为初始化table竞争失败的一种方案
* 也就是说没有竞争的时候会使用这个值,如果初始化table竞争失败也会使用这个值
*/
transient volatile long base;
//通过cas实现的锁,0无锁,1获得锁
transient volatile int cellsBusy;
//可用cpu数量
static final int NCPU=Runtime.getRuntime().availableProcessors();
Cell类源码解析
// @sun.misc.Contended这个用于避免伪共享,缓冲行填充
@sun.misc.Contended static final class Cell {
//定义一个原子类型变量
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
//value变量的偏移量
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
longAccumulate()解析
/**
* @param x 值
* @param fn 更新方法
* @param wasUncontended 竞争标识,标识在调用这个方法之前是否发生竞争。false发生竞争
*/
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
//获取线程probe值
if ((h = getProbe()) == 0) {
//如果probe值为0,强制初始化当前线程的probe值。
ThreadLocalRandom.current();
//再次获取线程probe值。
h = getProbe();
//没有发生竞争
wasUncontended = true;
}
//是否碰撞,false碰撞
//如果最后一个槽为不为空才为true
boolean collide = false;
for (;;) {
Cell[] as; Cell a; int n; long v;
//如果cells数组不等于null
if ((as = cells) != null && (n = as.length) > 0) {
//通过h从cell表中选定一个cell位置。
if ((a = as[(n - 1) & h]) == null) {
//如果当前位置没有cell,尝试新建一个Cell对象。
//获取锁
if (cellsBusy == 0) {
//创建一个Cell对象
Cell r = new Cell(x);
//获取cellsBusy锁
if (cellsBusy == 0 && casCellsBusy()) {
//cell创建标识
boolean created = false;
try {
Cell[] rs; int m, j;
//进入这里说明已经获取到锁,再次检测一下。
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
//设置新建的cell到指定位置。
rs[j] = r;
created = true;
}
} finally {
//释放cellsBusy锁
cellsBusy = 0;
}
if (created)
//如果创建成功,直接跳出循环,退出方法。
break;
//说明没有创建成功,则继续尝试。槽不是空的
continue;
}
}
//执行到这里,说明获取cellsBusy锁失败。碰撞标识设置为false
collide = false;
}
//执行到这里说明cell位置不为空,就是等于a
else if (!wasUncontended)
//设置未竞争标志位true,然后重试。
wasUncontended = true;
//通过cas将x值加到a的value上。
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
//成功退出循环
break;
else if (n >= NCPU || cells != as)
//说明cells表大于可用cpu数量,或者as数组过时;设置碰撞标识为false
collide = false;
else if (!collide)
//碰撞标识,设置为true
collide = true;
//尝试获取cellsBusy锁。
else if (cellsBusy == 0 && casCellsBusy()) {
try {
//校验as是否过时
if (cells == as) {
//cell数组扩容
Cell[] rs = new Cell[n << 1];
//老数组拷贝给新数组
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
//释放锁
cellsBusy = 0;
}
//扩容cell表后,再次重试。
collide = false;
continue;
}
//重新计算hash值。xorshift算法
h = advanceProbe(h);
}
//初始化cell表,先尝试获取cellsBusy锁。
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
//初始化标识
boolean init = false;
try {
if (cells == as) {
//初始化cell表,初始容量为2。
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
//释放锁
cellsBusy = 0;
}
//初始化成功,退出循环
if (init)
break;
}
//创建cell表由于竞争导致失败,尝试将x累加到base上。
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
}
}
LongAdder源码解析
类结构关系图
从类关系图可以看出LongAddr继承Striped64
add()解析
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
//校验数组是否为null或者通过cas修改base的值是否成功
if ((as = cells) != null || !casBase(b = base, b + x)) {
//是否竞争标识,uncontended=true没有表示发生竞争
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
//数组未初始化,或者数组某一index位置为null,或者cas修改失败
//longAccumulate已讲
longAccumulate(x, null, uncontended);
}
}
sum()解析
//sum的逻辑就是base+cells数组里面的值
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
reset()解析
//cells数组的值都置为0
public void reset() {
Cell[] as = cells; Cell a;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
a.value = 0L;
}
}
}
LongAdder是否能够替换AtomicLong
个人觉得是不行的,因为AtomicLong提供了很多cas方法,操作灵活,而LongAdder只有add和sum,使用起来比较受限。
AtomicLong:JVM将64位的double,long 型变量的读操作分为两次32位的读操作,所以低并发保持了AtomicLong性能。
LongAdder:高并发下热点数据被hash到多个 Cell,通过分散提升了并行度。