关键字:Accumulator 没能获得预期值,BinaryOperator 表达式如何写,Accumulator 出错
以下都以 Double 类型的说明
基本使用
LongAccumulator 和 DoubleAccumulator 是 Java 提供的用于为多线程提供原子值的一个类,可以视作一个能保证线程安全的 long 类型和 double 值来用
DoubleBinaryOperator doubleBinaryOperator = (a, b) -> {
return a * b;
};
// 创建一个 DoubleAccumulator 并设置初始值
DoubleAccumulator doubleAccumulator = new DoubleAccumulator(doubleBinaryOperator,2f);
// 根据 上面提供的 operator 运算,这里也就是 doubleBinaryOperator提供的值=2*3 变为6
doubleAccumulator.accumulate(3);
// 返回 doubleAccumlator 提供的值
int aaaaaa=doubleAccumulator.get();
参数 DoubleBinaryOperator
在 Java核心技术,关于LongAccumulator 和 DoubleAccumulator 的介绍中,
DoubleAccumulator 的实现方式,简单的说就是在内部提供了一组变量
identity ,a1, a2 ,a3 ···
调用 doubleAccumulator.accumulate(x)
线程不冲突的时候 , 直接 identity = doubleBinaryOperator (identity ,x)
线程冲突的时候,根据线程选择一个变量 a1= doubleBinaryOperator (a1,x),若 a1 没有被使用过,直接把 x 赋值给 a1 作为 a1 的初始值
在需要doubleAccumulator 提供值得时候也就是 调用 doubleAccumulator.get(); 的时候
提供 identity op a1 op a2 op a3 ...
即
result = identity;
result= doubleBinaryOperator (result,a1);
result= doubleBinaryOperator (result,a2);
result= doubleBinaryOperator (result,a3);
····
return result;
根据 以上的逻辑,那么 DoubleBinaryOperator 的实现的运算规则,是收到一定限制的
如果 运算规则 是 a*b
未产生线程冲突下
doubleAccumulator.accumulate(p1);
doubleAccumulator.accumulate(p2);
identity = identity*p1;
identity = identity*p2;
计算 result 的时候相当于
result =identity *p1 *p2;
线程冲突,使用 a1
a1=p1;
a1=p1*p2;
计算 result 的时候相当于
result=identity* (p1*p2);
------------------------------------------------------------
如果 运算规则 是 a-b
未产生线程冲突下
doubleAccumulator.accumulate(p1);
doubleAccumulator.accumulate(p2);
identity = identity-p1;
identity = identity-p2;
计算 result 的时候相当于
result =identity -p1 -p2;
线程冲突,使用 a1 // 这个 a1 其实就是源码中的 cell
a1=p1;
a1=p1-p2;
计算 result 的时候相当于
result=identity- (p1-p2);
result值为 identity-p1+p2
并不会获得我们期望的数值
--------------------------------------
所以一定要考虑好如何写 运算规则
源码
DoubleAccumulator.class
public DoubleAccumulator(DoubleBinaryOperator accumulatorFunction,
double identity) {
this.function = accumulatorFunction;
// 构造方法,在这里可以看到 初始值得赋予
base = this.identity = Double.doubleToRawLongBits(identity);
}
/**
* Updates with the given value.
*
* @param x the value
*/
public void accumulate(double x) {
Cell[] as; long b, v, r; int m; Cell a;
if ((as = cells) != null ||
(r = Double.doubleToRawLongBits
(function.applyAsDouble
(Double.longBitsToDouble(b = base), x))) != b && !casBase(b, r)) {
// as 非空的时候直接查数组,看有没有线程对应的 cell
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended =
(r = Double.doubleToRawLongBits
(function.applyAsDouble
(Double.longBitsToDouble(v = a.value), x))) == v ||
a.cas(v, r)))
// 找不到的时候就进入此方法 为线程构建一个它对应的 cell,此方法实现见下端
doubleAccumulate(x, function, uncontended);
}
}
/**
* Returns the current value. The returned value is <em>NOT</em>
* an atomic snapshot; invocation in the absence of concurrent
* updates returns an accurate result, but concurrent updates that
* occur while the value is being calculated might not be
* incorporated.
*
* @return the current value
*/
public double get() {
Cell[] as = cells; Cell a;
double result = Double.longBitsToDouble(base);
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
// 把 as 遍历,挨个做 applyAsDouble ,就是自己写的 运算规则
result = function.applyAsDouble
(result, Double.longBitsToDouble(a.value));
}
}
return result;
}
final void doubleAccumulate(double x, DoubleBinaryOperator fn,boolean wasUncontended) {
// 省略的其他代码
// 在这里可以看到,新建 Cell 的时候,就是直接 把 x 赋做初值 ,对应上面 a1=p1;
Cell r = new Cell(Double.doubleToRawLongBits(x));
}
测试
测试思路:
两组使用相同的初始值 ,和运算法则
组 1 ,使用 DoubleAccumulator,用两个线程 ,总共循环计算 2*times 次
组 2 ,使用 普通 double 类型,单线程 ,循环 计算 2*times 次,这个计算结果一定是我们期望的
调整 times 的数值,当 times 较小的时候,不容易产生同步问题,就是一直在使用 identity 做计算,可以看到 最后两组计算结果相同
当 times 较大时,线程冲突,使用了其他变量,可以看到 最后两组计算结果不相同
// 初始值
Double x=2.0d;
// 每个线程循环次数
int times=500000;
// 赋一个差值。让两个线程执行不同次数
int chazhi=-490000;
// 运算方式
DoubleBinaryOperator doubleBinaryOperator = (a, b) -> {
//return (a*b>999999||a*b<0.0000011)?a:a*b;
return a - b;
};
DoubleAccumulator doubleAccumulator = new DoubleAccumulator(doubleBinaryOperator,2.0d);
new Thread(()->{
for (int i = 0;i<times+chazhi; i++) {
// if (Double.isNaN(doubleAccumulator.get())||Double.isInfinite(doubleAccumulator.get())){
// System.out.println("t1 停止了");
// break;
// }
// if (doubleAccumulator.get()>99999f){
// System.out.println(doubleAccumulator.get()+"------t1 跳过");
// continue;
// }
doubleAccumulator.accumulate(x);
//System.out.println("t1第"+i+"次:"+doubleAccumulator.get());
}
System.out.println("t1 结束循环了 "+doubleAccumulator.get());
}).start();
new Thread(()->{
for (int i = 0;i<times-chazhi; i++) {
// if (Double.isNaN(doubleAccumulator.get())||Double.isInfinite(doubleAccumulator.get())){
//
// System.out.println("t2 停止了");
// break;
// }
// if (doubleAccumulator.get()<0.0001f){
// System.out.println(doubleAccumulator.get()+"------t2 跳过");
// continue;
// }
doubleAccumulator.accumulate(x);
//System.out.println("t2第"+i+"次:"+doubleAccumulator.get());
}
System.out.println("t2 结束循环了 "+doubleAccumulator.get());
}).start();
Double a= 2.0d;
for (int i = 0; i < times*2; i++) {
a=doubleBinaryOperator.applyAsDouble(a,x);
}
Thread.sleep(3000);
System.out.println("a的值----------- "+ BigDecimal.valueOf(a));
System.out.println("l的值----------- "+BigDecimal.valueOf(doubleAccumulator.get()));
运算规则为 a+b 的时候,无论如何 a , l 值相同
运算规则为 a-b 的时候
尝试在 doubleAccumulate(x, function, uncontended); 这个位置打断点
就会发现,只要在此位置触发了断点, a 和 l 的值就出现不同,因为使用了 cell ,这种运算规则在这种逻辑下不能满足我们的预期,最后计算结果的时候就出错了