[Java]重学Java-原子类

JUC包下的原子类

JUC就是大名鼎鼎的java并发包,我们今天来看看基于非阻塞性算法的CAS封装的原子类.
JUC下有AtomicIntegerAtomicLongAtomicBoolean等类,

UML
  • 在多线程的环境下对count变量进行自增
public static AtomicLong count = new AtomicLong(0);

public static void main(String[] args) throws InterruptedException {
    ExecutorService executorService = Executors.newCachedThreadPool();
    for (int i = 0; i < 10; i++) {
        executorService.execute(() -> System.out.println("现在对原子类进行自增:" + count.addAndGet(1)));
    }
    Thread.sleep(1000);
    System.out.println("原子类的值:"+count.get());
}

这里我们通过代码验证了,多线程环境下,原子类是可以保证线程安全的。

AtomicLong

public class AtomicLong extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 1927816293512124184L;

    // unsafe实例
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    // 偏移量
    private static final long valueOffset;

    // JVM是否支持Long类型的CAS
    static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();


    private static native boolean VMSupportsCS8();

    static {
        try {
            // 获取value在AtomicLong中的偏移量
            valueOffset = unsafe.objectFieldOffset
                (AtomicLong.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
    // 实际变量
    private volatile long value;

    public AtomicLong(long initialValue) {
        value = initialValue;
    }
    
    public final boolean compareAndSet(long expect, long update) {
        return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
    }

这里有几个细节:

  1. value被volatile修饰,保证了内存可见性.
  2. 所有的CAS操作依赖Unsafe

Unsafe类

JDK中的Unsafe类提供了硬件级别的原子性操作,他提供的都是nat
ive方法,Java使用JNI的方式调用C++的本地方法。我们简单来看看Unsafe类的核心方法.

  • sun.misc.Unsafe
    // 获取对应属性的偏移量
    public native long objectFieldOffset(Field var1);
    
    public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

    public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
    // 比对对象 obj 中偏移量 offset 的变 的值是否与 expect 相等,相等使用update更新  
    // 然后返回 true ,否 返回 false
    public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
    
    // 唤醒调用park后阻塞的线程。
    public native void unpark(Object var1);
    // 阻塞当前线程
    public native void park(boolean isAbsolute, long time);

更好性能的原子类-LongAdder

AtomicLong通过CAS保证了原子性,只借助CPU即可完成一个轻量级的"锁",但是在高并发的场景下,还是会出现多个线程去争夺一个变量的控制权这种情况,这会造成线程直接频繁竞争,浪费CPU资源。
在一些分布式的项目中,为了满足"秒杀",我们通常会将仓库进行内部拆分,尽可能切分成小的颗粒度,然后通过一些策略去扣除缓存中的库存数,在统计时,再统一读取所有"分片"的数据即可。

cell

LongAdder内部维护了多个cell变量,每个cell的初始值为0,当并发量上来的时候,线程会去抢夺空闲的cell的控制权,如果A竞争cellA失败,它会转向竞争cellB,如此一来,便减少了如AtomincLong出现的单一变量遭到多线程频繁竞争的问题。

在做读操作的时候,LongAdder直接统计base+所有cell的值返回即可.

  • java.util.concurrent.atomic.LongAdder#add
public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}
  1. LongAdder并不会一开始就创建cell数组,惰性加载.
  2. 如果cell数组为空,那么先对base值进行累加操作.

伪共享问题

之前我们聊过,为了解决CPU和内存直接运行速度的差距,操作系统设计了多级缓存架构,其中CPU和主内存之间,还存在着高速缓冲层,也就是CPU Cache.
CPU Cache内部是按行存储的,每一行称为Cache行,它是主存和Cache直接交换数据的基本单位。
CPU需要访问变量值时,会从CPU Cache查看是否有该变量,如果有直接读取,如果没有则从主内存读,然后回种到CPU Cache,这里回种的时候就有一个问题,CPU Cache是按行存储,变量的大小可能没有那么大,此时就会出现多个变量值存储在同一个Cache行中,当发生多线程访问该变量时,只能有一个线程访问同一个缓存行,此时就会出现排队竞争的情况,也就是伪共享.

缓存行

如何解决伪共享问题

解决的思路是将变量值填充到跟cache行一样的大小,这样刚好一行只存放一个变量,那么线程之间竞争xy的时候就会去访问不同的缓存行。
在JDK8中,可以使用@sun.misc.Contended注解声明,让JDK为你完成这部分工作.

LongAdder如何解决伪共享

答案在代码里面:

  • java.util.concurrent.atomic.Striped64.Cell
@sun.misc.Contended static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

LongAdder源码解析

我们先来看看LongAdder的类图结构:

LongAdder

结合上面的代码,我们可以知道,cell类其实是定义在Striped64类中,Striped64包含几个元素:

  1. base值,它是LongAdder的一个基础值.
  2. cell数组,用来减少多线程竞争同一变量进行了切分.同时为了方便读取,使用了volatile修饰value值,使其具备可见性.
  3. cellsBuy,自旋锁开关.通常在数组进行创建、初始化、扩容的时候进行变化.

读取LongAdder的值-sum

public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

代码比较简单,基本上就是累加sumcell中所有的值,注意,这里的值只是读取那一刻的快照值,不是100%准确,因为没有做任何加锁措施。

累加操作-add

public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}
  1. 判断当前cells是否为空,如果不为空对base值进行累加
  2. 如果cells不为空或者casBase失败了,那么进入执行代码块.
  3. 对cell数组进行定位,也就是找到当前线程去访问哪一个cell.
    我们继续来看看longAccumulate.
  • java.util.concurrent.atomic.Striped64#longAccumulate
final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); // force initialization
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) {
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {       // Try to attach new Cell
                    Cell r = new Cell(x);   // Optimistically create
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            // 当前cell数组元素大于CPU个数
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            // 是否有冲突
            else if (!collide)
                collide = true;
            // 当前元素个数未达到CPU个数并且有冲突,扩容
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = advanceProbe(h);
        }
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    Cell[] rs = new Cell[2];
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                // 变量被volatile修饰,这里不用加锁
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

在定位的过程中,我们会遇到冲突的问题,我们回忆一下在HashMap的底层,是通过拉链法解决哈希冲突的问题。
在LongAdder中,在初始化、扩容的时候都会涉及到cellsBusy这个标志位,如果当前cellsBusy为1,说明当前有线程在进行扩容或者初始化工作,此时其他线程会等待。
我们在Initialize table这行注释里面可以去看到cell数组初始化的过程.

初始化的过程可以理解为,多个线程CAS抢夺cellsBusy的控制权,抢到的线程进行初始化,其他线程进行等待.

扩容的过程有几个先决条件: cells中的元素个数小于当前机器CPU个数并且发生了CAS冲突(多个线程同时CAS竞争同一个cell),其中产生了CAS失败,那么就会进行扩容操作.为什么要有这个条件,这是为了解决更合理的使用CPU核心数资源(思考一下伪共享的情景),每个CPU分配一个cell无疑是最佳的.扩容的过程是将数组增大一倍,然后将元素进行迁移.

以上,便是大致的流程。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352

推荐阅读更多精彩内容