简介
HashMap是线程不安全的,所以 Java 还提供了 ConcurrentHashMap 类来解决高并发下的安全问题。
Java8 中,ConcurrentHashMap 相对于 Java7 来说, 它是通过 CAS 操作和 synchronized 来实现线程安全的, 而 Java7 是使用分段锁来实现线程安全的,并且Java7是对数组分段同步,而 Java8 是对数组元素同步。
这里大致看下的它的源码,ConcurrentHashMap 相对于 HashMap 来说,因为它处理的并发的情况,所以源码会复杂不少,这里做个大致了解与 HashMap 做个对比。
依然从 put 开始了解 ConcurrentHashMap 是如何实现线程安全的。
源码分析 Java8
put 方法
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
//这里可以直接发现一个和 HashMap 不同的地方, ConcurrentHashMap 不支持含 null 的键值对
if (key == null || value == null) throw new NullPointerException();
//计算哈希值
int hash = spread(key.hashCode());
//这个值将用来记录链表或者红黑树长度
int binCount = 0;
//此处开始自旋
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
//如果数组不存在或者长度为 0,则初始化数组
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//如果计算出来的位置在数组中还没有存放对象,那么通过 CAS 来放入
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
//由于可能在高并发的情况下, 所以正在扩容的时候也要考虑,这里会帮助扩容
tab = helpTransfer(tab, f);
else {
//如果该位置不为空,则可能有链表或者红黑树
V oldVal = null;
//使用 synchronized 同步该位置下对应的数组里的对象
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
//如果存在链表
binCount = 1;
//遍历链表
for (Node<K,V> e = f;; ++binCount) {
//接来下的逻辑和 HashMap 一样,在链表尾部插入新对象
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
//如果当前块里是红黑树
Node<K,V> p;
binCount = 2;
//此处遍历红黑树
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
//如果 key 相同, 那么根据 onlyIfAbsent 判断是否需要替换
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
// 链表节点大于8 转成红黑树
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//扩容的逻辑也在这里面
addCount(1L, binCount);
return null;
}
ConcurrentHashMap 的 put
方法,首先自旋,如果存放对象的块中为 null 时,将通过 CAS 来放入新键值对,如果已经存在对象的话,则使用 synchronized 给插入操作上锁。
如果发现正在扩容,那么将会利用并发的特性,来帮助扩容。
补上初始化数组的方法 initTable
。
initTable 方法
/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//不断循环直到数组建立
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
//如果 sizeCtl 小于 0,则说明有其他地方先初始化数组,所以放弃执行权
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
//不然就通过 CAS 来修改 sizeCtl 值为 -1 ,告诉其它线程数组正在初始化
try {
//接下来就是创建一个数组
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
//创建完数组后就将 sizeCtl 修改
sizeCtl = sc;
}
break;
}
}
//返回创建好的数组
return tab;
}
可以看出 ConcurrentHashMap 初始化数组的核心是 sizeCtl 这个值,这个值是 volatile 修饰的,保证了它的可见性,通过判断这个值是不是小于 0,来决定是否需要执行数组的初始化。
最后
从这两段源码可以看出 ConcurrentHashMap 是怎么样实现一个线程安全的 HashMap 了。
这里引出一下 HashTable 为什么被弃用的问题。
HashTable 与 HashMap 的不同
- 不支持 null 键值对,HashMap 可以。
- 线程安全,是对修改过程同步实现的,这样效率会很低。而 HashMap 不是线程安全。
- 初始容量是 11, 扩容是 2 * oldCap + 1, HashMap 为 16,2 * oldCap。
- 初始容量即扩容大小不一样,所以计算 index 的值方法也不一样。
为什么弃用 HashTable
原因主要出在上面第二点,它的修改是对整个方法同步实现的,效率会低非常多,同时它与 HashMap 还是有许多不同的地方, ConcurrentHashMap 在容量以及扩容规则等都延续了 HashMap。