阿里面试必问之CAS与LongAdder

简介

阿里面试的时候经常会问到高并发,解决并发的方案就是cas,也是AtomicLong这些原子类,那么如果问你除了Atomic这些原子类之外的​解法呢?

cas

java.util.concurrent.atomic 包下的原子操作类都是基于 CAS 实现的,CAS的全称是 Compare-and-Swap,也就是比较并交换。java.util.concurrent 包全完建立在CAS之上,没有CAS也就没有此包,可见CAS的重要性。
AtomicLong 类变量的定义:

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicLong.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile long value;

关键属性:
1、Unsafe是CAS的核心类
2、valueOffset表示的是变量值在内存中的偏移地址,因为 Unsafe 就是根据内存偏移地址获取数据的原值的。
3、value 是用volatile修饰的。

Unsafe类,cmpxchg方法

简单科普一下unsafe和cmpxchg:
Unsafe类是CAS的核心,Java 无法直接访问底层操作系统,而是通过本地方法来访问。不过尽管如此,JVM 还是开了一个后门,JDK 中有一个类 Unsafe,它提供了硬件级别的原子操作。
Unsafe位置在:sun.misc.Unsafe
Unsafe是通过JNI调用到unsafe.cpp中,而unsafe.cpp中的实现本质都是调用CPU的cmpxchg指令,也就是说cmpxchg是cpu级别的cas。
unsafe.cpp位置:/OpenJDK/hotspot/src/share/vm/prims/unsafe.cpp
cmpxchg位置:/OpenJDK/hotspot/src/os_cpu/windows_x86/vm/atomic_windows_x86.hpp
简单了解下就可以了。

Atomic的实现

    /**
     * Atomically increments by one the current value.
     *
     * @return the previous value
     */
    public final long getAndIncrement() {
        return unsafe.getAndAddLong(this, valueOffset, 1L);
    }

public final long getAndAddLong(Object var1, long var2, long var4) {
        long var6;
        do {
            var6 = this.getLongVolatile(var1, var2);
        } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));

        return var6;
    }

getAndIncrement() 会无限重试,直到CAS执行成功后退出循环,由此可见在高并发下发生CAS失败的几率也就越大,会导致循环次数过多从而带来额外性能问题。

cas缺点

1.aba问题:
CAS操作容易导致ABA问题,也就是在做a++之间,a可能被多个线程修改过了,只不过回到了最初的值,这时CAS会认为a的值没有变。就是一个值从A变成了B又变成了A,使用CAS操作不能发现这个值发生变化了。
2.性能问题
我们使用时大部分时间使用的是 while true 方式对数据的修改,直到成功为止。优势就是响应极快,但当线程数不停增加时,性能下降明显,因为每个线程都需要执行,占用CPU时间。

LongAdder

LongAdder并没有提供AtomicLong的getAnIncrement()或者IncrementAndGet()这样的原子操作,增加操作是increment()获取当前值是longValue(),增加和获取的操作是分离的,没有原子性,这是其跟AtomicLong的区别。

longValue()

    /**
     * Equivalent to {@link #sum}.
     *
     * @return the sum
     */
    public long longValue() {
        return sum();
    }

   public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

从longValue()方法可以看出,LongAdder的值是通过cell数组的value累加以及base的加和来算出来的,这么设计的意义何在?

Cell

cell数组是longAdder中的成员变量,cell也是Striped64的内部类。

    /**
     * Table of cells. When non-null, size is a power of 2.
     */
    transient volatile Cell[] cells;

    /**
     * Base value, used mainly when there is no contention, but also as
     * a fallback during table initialization races. Updated via CAS.
     */
    transient volatile long base;

longAdder的increment和decrement都是通过add()方法来实现的:

    /**
     * Equivalent to {@code add(1)}.
     */
    public void increment() {
        add(1L);
    }

    /**
     * Equivalent to {@code add(-1)}.
     */
    public void decrement() {
        add(-1L);
    }
    /**
     * Adds the given value.
     *
     * @param x the value to add
     */
    public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }

add方法先判断一下cells是否为空,当方法首次执行的时候肯定为空,因此会进入casBase()方法

    /**
     * CASes the base field.
     */
    final boolean casBase(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }

casBase方法就是简单的CAS操作,当casBase执行成功的时候就不会进入if判断内部,直接结束方法,那什么时候casBase执行失败?就是高并发的情况下,AtomicLong的处理方式是死循环尝试更新,直到成功才返回,而LongAdder则是进入这个if里的逻辑。这里就可以看出当并发不大的时候只对base进行更新,获取值得时候只从base获取即可,这个时候其实和AtomicLong的原理几乎一模一样,区别就在于if里的的分支,继续往下看。

            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))

if里的四个条件是递进关系:
as == null ,(m = as.length - 1) < 0 ,这两个是判断cell数组是否为空;
a = as[getProbe() & m]) == null :这个条件是取cells数组中的任意一个元素a判断是否为空;
uncontended = a.cas(v = a.value, v + x) :这个条件是第三个条件进行之后,取cells数组中的任意一个元素a然后进行cas操作。
从这里可以看出结论,当cas竞争激烈到一定程度时,会对cells数组中某个随机元素进行更新。

结论

因为制约AtomicLong高效的原因是高并发,高并发意味着CAS的失败几率更高, 重试次数更多,越多线程重试,CAS失败几率又越高,变成恶性循环,AtomicLong效率降低。 LongAdder给了一个非常巧妙的解决方案: 减少并发,将单一value的更新压力分担到多个value中去,用空间换时间,分段更新。这样,线程数再多也会分担到不同的value上去更新,这样就能解决AtomicLong中的 恶性循环。 cells就是这个 “段” ,cell中的value 就是存放更新值的,当我需要总数时,只需要把cells 中的value都累加一下就可以了。这也是为什么longValue()方法需要sum一下cell数组与base的原因。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。