LongAdder简单介绍

LongAdder简单介绍

在原子操作类AtomicLong中,在高并发的情况,会出现大量的线程去争抢更新同一个原子变量,但是同时只能有一个线程CAS操作成功,这就会出现大量线程争抢失败,然后通过自旋进行重试,这就会消耗CPU的资源;这个时候就出现了LongAdder原子操作类;它主要就是将这里的原子变量复制成多个,然后多个变量由线程去争,结束的时候,再进行多个变量值的计算;

先了解一下LongAdder的结构
1647521009246.jpg

从上图可以看出LongAdder是继承了Striped64这个类;其中定义了一个Cell类型的数组,Cell的具体结构如下:

    @sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

1、该类使用了@sun.misc.Contended注解,是为了解决伪共享问题; 2、该类内部定义了一个volatile修饰的long类型的value变量; 3、该类内部定义了一个cas(long cmp, long val)方法; 4、该类内部通过sun.misc.Unsafe类获取该类中value变量的偏移量;

因为每个Cell内部都维护了一个变量,所以在高并发情况下,不同的线程,会访问不同的Cell内部的long类型变量,而且当某一个线程在变更某一个Cell中的变量时,并不会在此Cell上进行自旋重试,而是尝试在其他的Cell变量上重试;最后将所有的value变量值sum后再加上base返回;

接下来简单了解一下LongAdder中比较重要的两方法add(long x)和longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended)

  • void add(long x)
    public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        //(1)
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            //(2)
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }

(1) 中判断cells数组是否为null,CAS操作base的值是否成功;一般的在并发不高的情况下,都是通过CAS操作直接修改base的值来完成;在并发高的情况下,casBase(b = base, b + x)方法可能会出现失败,这个时候就会使用cells数组;

(2) 中一共有三个判断

情况1、as == null || (m = as.length - 1) < 0; 这里的as是对cells的引用,如果成立表示此时cells数据还未进行初始化;这里的m是cells数组长度-1,如果m<0成立,证明此时cells数组的长度小于1,则可以理解此时cells数据还未初始化,然后进入到了longAccumulate方法中;

情况2、(a = as[getProbe() & m]) == null; 这里的getProbe()方法是获取当前线程的hash值,a的值是当前线程的hash值在cells数组中对应位置的值也就是Cell,如果为null,则证明当前线程还未对cells数组中的元素进行操作,然后进入到了longAccumulate方法中;

      static final int getProbe() {
  return UNSAFE.getInt(Thread.currentThread(), PROBE);
}

情况3、!(uncontended = a.cas(v = a.value, v + x)); 这里的a为当前线程对应的Cell, 然后对应的CAS操作当前的Cell中的value值,如果不成功,则进入到longAccumulate方法中

  • void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended)
    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
                //(3)
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            //(4)
            if ((as = cells) != null && (n = as.length) > 0) {
                //(4.1)
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                //(4.2)
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                //(4.3)
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                //(4.4)
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                //(4.5)
                else if (!collide)
                    collide = true;
                //(4.6)
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);
            }
            //(5)
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            //(6)
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }

该方法结合(2)中的三种情况来分析

情况1进入:此时cells数组还未初始化,执行到(3)的时候h = getProbe()为当前线程的hash值,因为当前线程还未对threadLocalRandomProbe值做任何操作,所以它肯定是0;所以(h = getProbe()) == 0;成立! 然后调用ThreadLocalRandom.current();

public static ThreadLocalRandom current() {
  if (UNSAFE.getInt(Thread.currentThread(), PROBE) == 0)
      localInit();
  return instance;
}
static final void localInit() {
  int p = probeGenerator.addAndGet(PROBE_INCREMENT);
  int probe = (p == 0) ? 1 : p; // skip 0
  long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT));
  Thread t = Thread.currentThread();
  UNSAFE.putLong(t, SEED, seed);
  UNSAFE.putInt(t, PROBE, probe);
}

到这里才对threadLocalRandomProbe进行了操作,然后在h = getProbe();获取最新的值; 这个时候因为as == cells == null,所以直接到(5);在(5)中cellsBusy是一个自旋锁,当等于0时,证明是无锁状态,可以获取锁;cells == as判断当前的cells是不是没有变动;然后再通过casCellsBusy()方法进行加锁,将cellsBusy赋值为1;在(5)的内部,先初始化一个大小为2的Cell[]数组,然后将通过位于运算将第0个或者第一个1元素的值设置为x的值,然后将这个新的数据赋值给cells数组,这个时候cells就不是空的了;最后释放锁,跳出循环结束;

情况二进入:此时cells数组肯定不为空,但是当前线程在cells中还没有设置过一个Cell元素;这个时候就到了(4)这一步,这个时候执行a = as[(n - 1) & h]==null判断,这里的h是当前线程调用了ThreadLocalRandom.current()之后的hash值,在做一次判断是否为null;

假设(4.1)成立;执行cellsBusy==0获取锁,尝试去new一个Cell对象,并且赋值为x;然后在尝试获取锁,并且通过CAS修改锁的状态,最后再次判断当前cells是否为空,当前线程是否没有对应的Cell元素,然后将新new的元素赋值给当前这个线程在cells数组中的对应位置的元素;最后释放锁结束;

假设(4.1)不成立;证明当前线程有对应的Cell元素,这个时候的wasUncontended=true;所以不会进入(4.2),所以看(4.3),此时的fn是等于null的,所以(4.3)就是a.cas(v = a.value, v + x),也就是将当前线程对应的Cell对象中的原子变量进行CAS操作,如果CAS成功结束循环;

假设(4.3)不成立;在(4.4)中设置collide = false;或者(4.5)中设置collide = true后,执行h = advanceProbe(h);重新设置当前线程的hash值,然后继续下一轮循环;

假设(4.6)成立;这里是是将cells数组进行扩容,然后继续下一轮循环;

情况三进入:此时对应线程是有cell元素的,只是cas失败了;然后进入(4.3)或者(4.6)继续循环;

代码(6)中是直接进行CAS操作base值;

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
禁止转载,如需转载请通过简信或评论联系作者。

推荐阅读更多精彩内容