非阻塞同步
互斥同步最主要的问题就是线程阻塞和唤醒所带来的性能问题,因此这种同步也称为阻塞同步。
互斥同步属于一种悲观的并发策略,总是认为只要不去做正确的同步措施,那就肯定会出现问题。无论共享数据是否真的会出现竞争,它都要进行加锁(这里讨论的是概念模型,实际上虚拟机会优化掉很大一部分不必要的加锁)、用户态核心态转换、维护锁计数器和检查是否有被阻塞的线程需要唤醒等操作。
1、不可变
不可变(Immutable)的对象一定是线程安全的,不需要再采取任何的线程安全保障措施。只要一个不可变的对象被正确地构建出来,永远也不会看到它在多个线程之中处于不一致的状态。多线程环境下,应当尽量使对象成为不可变,来满足线程安全。
不可变的类型:
* final 关键字修饰的基本数据类型
* String
* 枚举类型
Number 部分子类,如 Long 和 Double 等数值包装类型,BigInteger 和 BigDecimal 等大数据类型。但同为 Number 的原子类 AtomicInteger 和 AtomicLong 则是可变的。
对于集合类型,可以使用 Collections.unmodifiableXXX() 方法来获取一个不可变的集合。
public class ImmutableExample {
public static voidmain(String[] args) {
Map map = new HashMap<>();
Map unmodifiableMap = Collections.unmodifiableMap(map);
unmodifiableMap.put("a", 1);
}
}
Exception in thread "main"java.lang.UnsupportedOperationException
atjava.util.Collections$UnmodifiableMap.put(Collections.java:1457)
atImmutableExample.main(ImmutableExample.java:9)
Collections.unmodifiableXXX() 先对原始的集合进行拷贝,需要对集合进行修改的方法都直接抛出异常。
public V put(K key, V value) {
throw newUnsupportedOperationException();
}
2、 CAS
随着硬件指令集的发展,我们可以使用基于冲突检测的乐观并发策略:先进行操作,如果没有其它线程争用共享数据,那操作就成功了,否则采取补偿措施(不断地重试,直到成功为止)。这种乐观的并发策略的许多实现都不需要将线程阻塞,因此这种同步操作称为非阻塞同步。
乐观锁需要操作和冲突检测这两个步骤具备原子性,这里就不能再使用互斥同步来保证了,只能靠硬件来完成。硬件支持的原子性操作最典型的是:CAS(Compare and Swap),即比较并替换,实现并发算法时常用到的一种技术,Doug lea大神在java同步器中大量使用了CAS技术,鬼斧神工的实现了多线程执行的安全性。
CAS的思想很简单:三个参数,一个当前内存值V、旧的预期值A、即将更新的值B,当且仅当预期值A和内存值V相同时,将内存值修改为B并返回true,否则什么都不做,并返回false。
从思想上来说,Synchronized属于悲观锁,悲观地认为程序中的并发情况严重,所以严防死守。CAS属于乐观锁,乐观地认为程序中的并发情况不那么严重,所以让线程不断去尝试更新。
Java语言CAS底层如何实现?利用unsafe提供的原子性操作方法。
原理:
1.在内存地址V当中,存储着值为10的变量。
2.此时线程1想要把变量的值增加1。对线程1来说,旧的预期值A=10,要修改的新值B=11。
3.在线程1要提交更新之前,另一个线程2抢先一步,把内存地址V中的变量值率先更新成了11。
4.线程1开始提交更新,首先进行A和地址V的实际值比较(Compare),发现A不等于V的实际值,提交失败。
5.线程1重新获取内存地址V的当前值,并重新计算想要修改的新值。此时对线程1来说,A=11,B=12。这个重新尝试的过程被称为自旋。
6.这一次比较幸运,没有其他线程改变地址V的值。线程1进行Compare,发现A和地址V的实际值是相等的。
7.线程1进行SWAP,把地址V的值替换为B,也就是12。
CAS的缺点:
1)CPU开销较大
在并发量比较高的情况下,如果许多线程反复尝试更新某一个变量,却又一直更新不成功,循环往复,会给CPU带来很大的压力。
2)不能保证代码块的原子性
CAS机制所保证的只是一个变量的原子性操作,而不能保证整个代码块的原子性。比如需要保证3个变量共同进行原子性的更新,就不得不使用Synchronized了。
3)ABA问题
这是CAS机制最大的问题所在,什么是ABA问题?怎么解决?
当一个值从A更新成B,又更新会A,普通CAS机制会误判通过检测。利用版本号比较可以有效解决ABA问题,A-B-A就变成1A-2B-3A。
J.U.C 包提供了一个带有标记的原子引用类 AtomicStampedReference 来解决这个问题,它可以通过控制变量值的版本来保证 CAS 的正确性。大部分情况下 ABA 问题不会影响程序并发的正确性,如果需要解决 ABA 问题,改用传统的互斥同步可能会比原子类更高效。
使用场景:
1)Atomic系列类
AtomicInteger是一个支持原子操作的 Integer 类,就是保证对AtomicInteger类型变量的增加和减少操作是原子性的,不会出现多个线程下的数据不一致问题。如果不使用 AtomicInteger,要实现一个按顺序获取的ID,就必须在每次获取时进行加锁操作,以避免出现并发时获取到同样的ID的现象。
接下来通过源代码来看AtomicInteger具体是如何实现的原子操作:
首先看incrementAndGet() 方法,下面是具体的代码。
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current +1;
if(compareAndSet(current, next))
return next;
}
}
通过源码,可以知道,这个方法的做法为先获取到当前的 value 属性值,然后将 value 加 1,赋值给一个局部的 next 变量,然而,这两步都是非线程安全的,但是内部有一个死循环,不断去做compareAndSet操作,直到成功为止,也就是修改的根本在compareAndSet方法里面,compareAndSet()方法的代码如下:
public final boolean compareAndSet(int expect, int update) {
returnunsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
compareAndSet()方法调用的compareAndSwapInt()方法的声明如下,是一个native方法。
publicfinal native boolean compareAndSwapInt(Object var1, long var2,int var4, intvar5);
compareAndSet 传入的为执行方法时获取到的 value 属性值,next 为加 1 后的值, compareAndSet所做的为调用 Sun 的 UnSafe 的 compareAndSwapInt 方法来完成,此方法为 native 方法,compareAndSwapInt 基于的是CPU 的 CAS指令来实现的。所以基于 CAS 的操作可认为是无阻塞的,一个线程的失败或挂起不会引起其它线程也失败或挂起。并且由于 CAS 操作是 CPU 原语,所以性能比较好。
类似的,还有decrementAndGet()方法。它和incrementAndGet()的区别是将 value 减 1,赋值给next 变量。
2)Lock系列类
3)Synchronized转变为重量级锁之前,也会采用CAS机制
3、原子类AtomicInteger
J.U.C 包里面的整数原子类 AtomicInteger 的方法调用了 Unsafe 类的 CAS 操作。
以下代码使用了 AtomicInteger 执行了自增的操作。
private AtomicInteger cnt = new AtomicInteger();
public void add() {
cnt.incrementAndGet();
}
以下代码是 incrementAndGet() 的源码,它调用了 Unsafe 的 getAndAddInt()。
public final int incrementAndGet() {
returnunsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
以下代码是 getAndAddInt() 源码,var1 指示对象内存地址,var2 指示该字段相对对象内存地址的偏移,var4 指示操作需要加的数值,这里为 1。通过getIntVolatile(var1, var2) 得到旧的预期值,通过调用 compareAndSwapInt() 来进行 CAS 比较,如果该字段内存地址中的值等于 var5,那么就更新内存地址为 var1+var2 的变量为 var5+var4。
可以看到 getAndAddInt() 在一个循环中进行,发生冲突的做法是不断的进行重试。
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 =this.getIntVolatile(var1, var2);
}while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
4、 线程本地存储(Thread Local Storage)
如果一段代码中所需要的数据必须与其他代码共享,那就看看这些共享数据的代码是否能保证在同一个线程中执行。如果能保证,我们就可以把共享数据的可见范围限制在同一个线程之内,这样,无须同步也能保证线程之间不出现数据争用的问题。
符合这种特点的应用并不少见,大部分使用消费队列的架构模式(如“生产者-消费者”模式)都会将产品的消费过程尽量在一个线程中消费完。其中最重要的一个应用实例就是经典 Web 交互模型中的“一个请求对应一个服务器线程”(Thread-per-Request)的处理方式,这种处理方式的广泛应用使得很多 Web 服务端应用都可以使用线程本地存储来解决线程安全问题。
可以使用 java.lang.ThreadLocal 类来实现线程本地存储功能。
ThreadLocal,很多地方叫做线程本地变量,也有些地方叫做线程本地存储,其实意思差不多。ThreadLocal为变量在每个线程中都创建了一个副本,那么每个线程可以访问自己内部的副本变量。
private ThreadLocal myThreadLocal = new ThreadLocal();实例化了一个ThreadLocal对象。每个线程仅需要实例化一次即可。虽然不同的线程执行同一段代码时,访问同一个ThreadLocal变量,但是每个线程只能看到私有的ThreadLocal实例。所以不同的线程在给ThreadLocal对象设置不同的值时,他们也不能看到彼此的修改。
先了解一下ThreadLocal类提供的几个方法:
public T get() { }
public void set(T value) { }
public void remove() { }
protected T initialValue() { }
get()方法是用来获取ThreadLocal在当前线程中保存的变量副本,set()用来设置当前线程中变量的副本,remove()用来移除当前线程中变量的副本,initialValue()是一个protected方法,一般是用来在使用时进行重写的,它是一个延迟加载方法。
首先我们来看一下ThreadLocal类是如何为每个线程创建一个变量的副本的:
public T get() {
Thread t =Thread.currentThread();
ThreadLocalMap map =getMap(t);
if (map != null) {
ThreadLocalMap.Entrye = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result =(T)e.value;
return result;
}
}
returnsetInitialValue();
}
第一句是取得当前线程,然后通过getMap(t)方法获取到一个map,map的类型为ThreadLocalMap。然后接着下面获取到<key,value>键值对,注意这里获取键值对传进去的是 this,而不是当前线程t。
如果获取成功,则返回value值。
如果map为空,则调用setInitialValue方法返回value。
在getMap中,是调用当期线程t,返回当前线程t中的一个成员变量threadLocals。
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
那么我们继续取Thread类中取看一下成员变量threadLocals是什么:
ThreadLocal.ThreadLocalMap threadLocals = null;
实际上就是一个ThreadLocalMap,这个类型是ThreadLocal类的一个内部类,我们继续取看ThreadLocalMap的实现:
static class ThreadLocalMap {
static class Entryextends WeakReference> {
/** The valueassociated with this ThreadLocal. */
Object value;
Entry(ThreadLocalk, Object v) {
super(k);
value = v;
}
}
.......
}
可以看到ThreadLocalMap的Entry继承了WeakReference,并且使用ThreadLocal作为键值。
public void set(T value) {
Thread t =Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this,value);
else
createMap(t, value);
}
ThreadLocal是如何为每个线程创建变量的副本的:
首先,在每个线程Thread内部有一个ThreadLocal.ThreadLocalMap类型的成员变量threadLocals,这个threadLocals就是用来存储实际的变量副本的,键值为当前ThreadLocal变量,如下面例子中的longLocal,value为变量副本(即T类型的变量),如下面中的Thread.currentThread().getId()。为何threadLocals的类型ThreadLocalMap的键值为ThreadLocal对象,因为每个线程中可有多个threadLocal变量。
初始时,在Thread里面,threadLocals为空,当通过ThreadLocal变量调用get()方法或者set()方法,就会对Thread类中的threadLocals进行初始化,并且以当前ThreadLocal变量为键值,以ThreadLocal要保存的副本变量为value,存到threadLocals。
然后在当前线程里面,如果要使用副本变量,就可以通过get方法在threadLocals里面查找。
总结一下:
1)实际的通过ThreadLocal创建的副本是存储在每个线程自己的threadLocals中的;
2)为何threadLocals的类型ThreadLocalMap的键值为ThreadLocal对象,因为每个线程中可有多个threadLocal变量,就像上面代码中的longLocal和stringLocal;
示:1:
public class Test {
ThreadLocal<Long> longLocal = new ThreadLocal<Long>();
ThreadLocal<String> stringLocal = new ThreadLocal<String>();
public void set() {
longLocal.set(Thread.currentThread().getId());
stringLocal.set(Thread.currentThread().getName());
}
public long getLong() {
return longLocal.get();
}
public String getString() {
return stringLocal.get();
}
public static void main(String[] args) throws InterruptedException {
final Test test = new Test();
test.set();
System.out.println(test.getLong());
System.out.println(test.getString());
Thread thread1 = new Thread(){
public void run() {
test.set();
System.out.println(test.getLong());
System.out.println(test.getString());
};
};
thread1.start();
thread1.join();
System.out.println(test.getLong());
System.out.println(test.getString());
}
}
输出结果为:
示例2:
为了理解 ThreadLocal,先看以下代码:
public class ThreadLocalExample1 {
public static voidmain(String[] args) {
ThreadLocalthreadLocal1 = new ThreadLocal();
ThreadLocalthreadLocal2 = new ThreadLocal();
Thread thread1 = newThread(() -> {
threadLocal1.set(1);
threadLocal2.set(1);
});
Thread thread2 = newThread(() -> {
threadLocal1.set(2);
threadLocal2.set(2);
});
thread1.start();
thread2.start();
}
}
它所对应的底层结构图为:
每个 Thread 都有一个ThreadLocal.ThreadLocalMap 对象。
/* ThreadLocal values pertaining to this thread. This map ismaintained
* by the ThreadLocal class.*/
ThreadLocal.ThreadLocalMap threadLocals = null;
当调用一个 ThreadLocal 的 set(T value) 方法时,先得到当前线程的 ThreadLocalMap 对象,然后将 ThreadLocal->value 键值对插入到该 Map 中。
public void set(T value) {
Thread t =Thread.currentThread();
ThreadLocalMap map =getMap(t);
if (map != null)
map.set(this,value);
else
createMap(t, value);
}
get() 方法类似。
public T get() {
Thread t =Thread.currentThread();
ThreadLocalMap map =getMap(t);
if (map != null) {
ThreadLocalMap.Entrye = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
returnsetInitialValue();
}
ThreadLocal 从理论上讲并不是用来解决多线程并发问题的,因为根本不存在多线程竞争。
在一些场景 (尤其是使用线程池) 下,由于ThreadLocal.ThreadLocalMap 的底层数据结构导致 ThreadLocal 有内存泄漏的情况,应该尽可能在每次使用 ThreadLocal 后手动调用 remove(),以避免出现 ThreadLocal 经典的内存泄漏甚至是造成自身业务混乱的风险。
5、Copy-On-Write
可参考Java容器相关(3)-- 同步容器和并发容器章节相关内容。