AtomicInteger位于java.util.concurrent.atomic包下,是对int的封装,提供原子性的访问和更新操作,其原子性操作的实现是基于CAS。
1. CAS
CAS(compare-and-swap)直译即比较并交换,提供原子化的读改写能力,是Java 并发中所谓 lock-free 机制的基础。CAS一般适用于竞争小的场景。CAS的思想很简单:三个参数,一个当前内存值V、旧的预期值A、即将更新的值B,当且仅当预期值A和内存值V相同时,将内存值修改为B并返回true,否则什么都不做,并返回false。可能会有面试官问 CAS 底层是如何实现的,在JAVA中,CAS通过调用C++库实现,由C++库再去调用CPU指令集。不同体系结构中,cpu指令还存在着明显不同。比如,x86 CPU 提供 cmpxchg 指令;而在精简指令集的体系架构中,(如“load and reserve”和“store conditional”)实现的,在大多数处理器上 CAS 都是个非常轻量级的操作,这也是其优势所在。(相比于重量级的系统调用,切换到内核状态)
CAS的缺点有以下几个方面:
ABA问题
如果某个线程在CAS操作时发现,内存值和预期值都是A,就能确定期间没有线程对值进行修改吗?答案未必,如果期间发生了 A -> B -> A 的更新,仅仅判断数值是 A,可能导致不合理的修改操作。针对这种情况,Java 提供了 AtomicStampedReference 工具类,通过为引用建立类似版本号(stamp)的方式,来保证 CAS 的正确性。也就是我这次的判断既要看内存的值,也要看版本。版本最简单的可以是每进行一次修改,版本号加1。这样ABA问题就不存在了。
循环时间长开销大
CAS中使用的失败重试机制,隐藏着一个假设,即竞争情况是短暂的。大多数应用场景中,确实大部分重试只会发生一次就获得了成功(重试原因在于CAS是一种乐观的锁机制,他尽可能不切换到内核态去挂起当前线程,而是尝试重新获取锁,也就是自旋,即一个死循环。但这点开销对比切换到内核还是小case)。但是总有意外情况,万一竞争很激烈呢?(比如滴滴打车在周六下午6点和周一下午3点,线程的竞争是完全不一样的)所以在有需要的时候,还是要考虑限制自旋的次数(也就出现了后面的自适应自旋),以免过度消耗 CPU。
只能保证一个共享变量的原子操作
Java中的CAS操作只是对CPU的cmpxchgq指令的一层封装。它的功能就是一次只原子地修改一个变量。这个指令总不能一次改2个变量吧。
2.AtomicInteger原理浅析
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
- 从 AtomicInteger 的内部属性可以看出,它依赖于Unsafe 提供的一些底层能力,进行底层操作;如根据valueOffset代表的该变量值在内存中的偏移地址,从而获取数据的。变量value用volatile修饰,保证了多线程之间的内存可见性。
- 下面以getAndIncrement为例,说明其原子操作过程
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
//unsafe.getAndAddInt
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
// compareAndSwapInt他就是通过unsafe类调用cpu指令cmpxchgq
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
- 假设线程1和线程2通过getIntVolatile拿到value的值都为1,线程1被挂起,线程2继续执行
- 线程2在compareAndSwapInt操作中由于预期值和内存值都为1,因此成功将内存值更新为2
- 线程1继续执行,在compareAndSwapInt操作中,预期值是1,而当前的内存值为2,CAS操作失败,什么都不做,返回false
- 线程1重新通过getIntVolatile拿到最新的value为2,再进行一次compareAndSwapInt操作,这次操作成功,内存值更新为3
- 另外看一个栗子,我们想让多个线程去自增同一变量。
/**
* 两个线程实现对同一变量的自增操作。
* mCount总能达到预期值
* j一般小于20000
*/
public class Demo6 implements Runnable {
private static AtomicInteger mCount = new AtomicInteger(0);
private static int j = 0;
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new Demo6());
Thread t2 = new Thread(new Demo6());
t1.start();
t2.start();
Thread.sleep(1000);
// 总能保证输出20000
System.out.println(Demo6.mCount);
// 每次输出的值不一样
System.out.println(Demo6.j);
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
j++;
mCount.getAndIncrement();
}
}
}
- 上面j,即便改成
private static volatile int j = 0;
- j的值仍然不符合预期,因为j++本身就是非原子操作。j++这一句在字节码中会被拆成3句,getstatic、iconst_1、iadd,在getstatic时,的确是最新的,但是后两句的数据可能是过期的。
- 当然办法还是有的,
@Override
public void run() {
for (int i = 0; i < 500; i++) {
j++;
mCount.getAndIncrement();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- 原因在于Thread.sleep(1000)的时候,CPU空闲,所以CPU会去同步主存和线程工作内存之间的值。不过加sleep()的实际意义是啥呢?我也不知道。我只知道这么用能符合预期。哈哈哈哈。
#########################################################
3.atomic包里的其他类都是基于CAS的。
此包经常用到的类AtomicBoolean、AtomicInteger、AtomicLong、AtomicReference、AtomicStampedReference等,打开源码包我们可以看到其实这些方法是调用Unsafe类提供compareAndSwapObject、compareAndSwapInt、compareAndSwapLong等方法,Unsafe类的全称是sun.misc.Unsafe,这个类是属于JNI的类,Unsafe里面的native方法直接操作内存,getUnfate()仅供高级的Bootstrap类加载器使用,其实就是直接操作
CPU。
那么,compareAndSwap也就变得很浅显了,其实就是调用了CPU的系统原语,系统原语简单来说就是CPU的一个不能再分割指令,所以比较和执行是在CPU原子操作的,而不是像之前一样,从工作内存拷贝到执行引擎,这里就不是一个命令那么简单,所以CPU搞定的事情就是要比拷贝再去执行要靠谱。这个包的保证原子操作就是这样来保证的。
那么AtomicStampedReference又是用来干嘛的呢,看了之后发现端倪了,其实他就是用来解决ABA问题,AtomicStampedReference原子类是一个带有时间戳的对象引用。
/**
* Atomically sets the value of both the reference and stamp
* to the given update values if the
* current reference is {@code ==} to the expected reference
* and the current stamp is equal to the expected stamp.
*
* @param expectedReference the expected value of the reference
* @param newReference the new value for the reference
* @param expectedStamp the expected value of the stamp
* @param newStamp the new value for the stamp
* @return {@code true} if successful
*/
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
- 注意注释里说的,if the current reference is == to the expected reference
and the current stamp is equal to the expected stamp. 即value和stamp都要达到预期才去更新,stamp相当于版本的作用。从而解决了ABA问题。