最近总是在分析源码,感觉源码也不是想象上的那么难,今天我来记录一下我对ConcurrentHashMap的理解。这里只敢说记录,不敢说分析,因为ConcurrentHashMap的代码确实有点难以理解,本文不对代码进行死磕,也就是说,可能不会对某一行代码进行死磕,而是对整个ConcurrentHashMap类进行整体的理解。
本文参考资料:
1.ConcurrentHashMap源码分析(JDK8版本)
2. 死磕 Java 并发:J.U.C 之 Java 并发容器:ConcurrentHashMap
3.深入浅出ConcurrentHashMap1.8
4.深入分析ConcurrentHashMap1.8的扩容实现
5.方腾飞、魏鹏、程晓明的《Java 并发编程的艺术》
1.为什么要使用ConcurrentHashMap?
其实,楼主都觉得这个问题问的非常傻逼,为什么使用ConcurrentHashMap呢?当然是为了线程安全呗。其实这个回答是比较笼统的,这里面还有很多的问题没有涉及到,比如说,使用最多的HashMap在多线程模型下的缺点,传统线程安全的HashTable的问题等等。
(1).线程不安全的HashMap
我们才学习HashMap的时候,就知道HashMap是线程不安全的,但是不知道HashMap由于多线程会导致什么问题。下面,我们来看一段代码:
public class Demo {
public static void main(String[] args) {
final Map<String, String> map = new HashMap<>(2);
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0; i < 100000; i++){
new Thread(new Runnable() {
@Override
public void run() {
map.put(UUID.randomUUID().toString(), "");
}
}).start();
}
}
});
t.start();
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
HashMap在并发执行put操作是会引起死循环,是因为多线程会导致HashMap的Entry链表形成数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry。
(2).效率低下的HashTable
HashTable使用synchronized关键字来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下。因为当一个线程访问HashTable的同步方法,其他线程也访问HashTable的同步方法,会进入阻塞或者轮询状态。如果线程1在使用put进行元素添加,线程2不但不能使用put方法添加元素也不能使用get方法来获取元素,所以竞争越激烈效率越低。
(3).ConcurrentHashMap
相较于笨重的HashTable,ConcurrentHashMap就显得非常高效。ConcurrentHashMap降低了锁的粒度,其中在1.7中,设置了Segment数组,来表示不同数据段,在1.8中,取消了Segment数组,进一步降低了锁的粒度。由于本文是分析1.8的ConcurrentHashMap,所以不对1.7的版本过多的解释。
2.ConcurrentHashMap的模型图
1.8在1.7的基础上进一步的优化,使得ConcurrentHashMap的锁的粒度进一步降低。我们还是先来看看ConcurrentHashMap的结构图:
我们看到的是,在1.8的内部,维持了一个Node数组,用来存储数据。实际上,ConcurrentHashMap结构与HashMap的结构类似,基本结构都是数组+链表+红黑树。但是在ConcurrentHashMap里面,红黑树在Node数组内部存储的不是一个TreeNode对象,而是一个TreeBin对象,TreeBin内部维持着一个红黑树。
3.ConcurrentHashMap的内部类
简单的看过ConcurrentHashMap的结构图之后,现在来了解一下ConcurrentHashMap的内部类,这个在后面我们理解源码有一定的帮助。
(1).Node类
Node类在ConcurrentHashMap的内部类是最基本的类,其中桶里面装的就是Node元素,还有就是TreeBin、TreeNode等都继承于Node类。我们先来看看Node的代码:
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}
/**
* Virtualized support for map.get(); overridden in subclasses.
*/
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}
我们来看一下Node成员变量:
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
hash用来存储该key的hash值,但是这里面有三个状态需要注意:
名称 | 值 | 描述 |
---|---|---|
MOVED | -1 | hash值为-1的Node,表示当前节点已经被处理过了,这中情况将出现多线程扩容的情况下,后面会详细的解释 |
TREEBIN | -2 | hash值为-2的Node,表示当前的节点是TreeBin |
RESERVED | -3 | 保留的hash值,用在ReservationNode上面 |
key用来存储的key值,val用来存储value值,这个就不用多说。next字段表示下一个Node,这个在出现哈希冲突时,将定位在相同位置上的Node使用链表连接起来。
我们还需要注意一点是,我们不能够通过调用setValue方法来改变Node的值。
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
(2).TreeNode类和TreeBin类
TreeNode类表示的是红黑树上的每个节点。当一个链表上的节点数量超过了指定的值,会将这个链表变为红黑树,当然每个节点就转换为TreeNode。不像HashMap,ConcurrentHashMap在桶里面直接存储的不是TreeNode,而是一个TreeBin,在TreeBin内部维护一个红黑树,也就是说TreeNode在TreeBin内部使用的。
(3).ForwardingNode类
这个是辅助类,通常在扩容时用到。此时如果一个线程在进行扩容操作,另一个线程put一个元素进来,如果看到对应的位置上面是ForwardingNode对象的话,那么就参与扩容的操作队列来。其中ForwardingNode对象来源有两个:1.原来的位置为null,但是此时复制操作已经到当前位置的后面了,会将这个原来的桶的这个位置置为ForwardingNode对象;2.原来位置不为null,但是已经操作过这个位置了。
我们来来ForwardingNode的源码:
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
//省略了find方法代码
}
在ForwardingNode里面,我们发现了一个nextTable对象,这个对象表示扩容之后的数组,当一个线程访问到了一个ForwardingNode对象,就知道当前正在进行扩容操作,当前这个线程会帮助扩容(扩容分为两步:1.数组容量增大2倍;2.将原数组的元素拷贝到新数组里面去),将原数组的元素复制到这个nextTable里面去。
4.基本成员变量
我们在分析ConcurrentHashMap的源码时,还是先来看看它基本的成员变量。
变量名 | 描述 |
---|---|
table | 桶数组,用来存储Node元素的。默认为null,只在第一次put操作的进行初始化,该数组的长度永远为2的n次方。 |
nextTable | 默认为null,当不为null,表示当前正在进行扩容操作,这个数组就是扩容之后的数组,长度为原数组的两倍。 |
baseCount | 记录map中元素个数,由于是多线程操作,baseCount记录的不准确,所以要结合counterCells 来使用保证记录的正确性。 |
sizeCtl | 表初始化和扩容的控制位。其中,当为-1时,表示当前table数组正在被初始化;当为-N时,表示有N-1个线程在进行扩容操作;当为0(默认值)时,表示当前table还未使用,此时table为null;当为其余正整数时,表示table的容量,默认是table大小的0.75倍,在代码中使用的是(n - (n>>>2))的方式来计算0.75 |
5.构造方法
在ConcurrentHashMap里面,我觉得有两个构造方法需要我们关注,一个是无参数的构造方法,这个构造方法非常的简单,是一个空的构造方法,还有一个就是带初始容量的构造方法:
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
在这个构造方法里面最主要的是,初始化sizeCtl。在这里保证sizeCtl必须是2的n次方,所以这里调用tableSizeFor方法的目的就是调整sizeCtl大小为2的n次方。
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
假设给出大小为100,调用这个方法之后,会将大小调整为256。
6.table数组的初始化
在前面,对table数组的初始化也有所提及。table不会在构造方法里面进行初始化,而是在第一次put操作时,进行初始化。我们来看看:
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//发现table数组尚未初始化,调用initTable方法来对table数组进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//省略了其余代码
}
在调用put方法时,如果发现table尚未被初始化,会调用initTable方法来初始化。我们来看看initTable方法的代码:
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//sizeCtl小于0,表示已经有线程在对table数组进行初始化,现在这个线程要做的就是让出cpu时间,
//全力支持table初始化
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//此时有一个线程在对table数组进行是初始化,如果使用CAS算法对sizeCtl字段更新成功,
//表示当前线程可以对table数组进行初始化。
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
在initTable方法中,我们发现table数组只能被一个线程初始化。如果一个线程在初始化table数组,会将sizeCtl字段更新为-1,其他线程看到sizeCtl为-1时,就知道当前已经有线程在初始化table数组,他们需要做的是,全力支持那个线程初始化table数组--让出cpu占用时间。
同时,我们看到当table数组更新成功之后,sizeCtl更新为table数组大小的0.75倍:
sc = n - (n >>> 2);
7.put操作
我们使用HashMap最多的操作就是put操作和get操作。这里先对put进行分析。我们先总体看一下put的代码:
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
这里,我先将整个put方法的代码贴出,让大家有一个总体上概念,然后再来一一解释,整个put操作的流程。
整个put操作分为4种情况:
1.如果当前table没有被初始化,会先初始化,重新进行put操作(循环操作,保证put成功)。
2.通过hash计算,定位位置,如果位置上为null,表示可以直接放进去。
3.如果对应位置的元素已经被标记为MOVED,即Node的hash为MOVED,那么表示当前table数组在进行扩容操作,此时,当前的线程会帮助扩容。
4.如果通过hash计算之后的位置不为null,表示出现了哈希冲突,此时会锁住当前这个位置上的Node对象,然后将新添加的Node添加这个位置链表或者红黑树上。
(1).hash计算
put操作的第一步就是hash计算,这里的hash计算,我认为分为两步:
1.调用spread方法将key的hashcode再次hash。
2.通过(n - 1) & hash方法来计算该元素定位的位置。
我们先来看看spread方法:
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
我先来解释一下>>>运算符的含义:
>>>的基本操作就是左移,然后高位补0,这里的左移表示连符号位都要跟着左移;而>>只是左移数值位,不移动符号位。
h ^ (h >>> 16)与运算过程如下图:
异或操作完成之后,然后跟HASH_BITS取与操作,我认为是为了消除符号位的影响。
至于(n - 1) & hash计算,其实就是hash % n的计算,具体的解释,大家可以参考我的Java 源码分析-ThreadLocal,这篇文章里面有相关的解释。
(2).元素放入数组
由于初始化table数组在前面已经说了,所以这里直接说赋值的情况。在这种情况下,还有分为两种情况:1.对应位置为null;2.对应位置不为null。这里将将两种情况分别讨论一下。
A.对应位置为null
这种情况比较简单,直接调用了casTabAt方法进行赋值:
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
这里使用Unsafe类来对变量进行CAS更新,具体的实现原理,楼主也不是很清楚,实在抱歉!
A.对应位置不为null
这种情况下,就比较复杂了,我们先来看看代码:
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
这种情况下,新添加的Node肯定会添加到这个位置后面,当然为了保证线程安全,肯定锁住当前的Node,这样将锁的粒度降到了每个Node上,一个Node的put操作不会影响其他Node的操作。
锁住了之后,当然就要添加Node。这里分为两种情况,一种原来的Node不是TreeBin,也就是说原来就是一个链表,这样直接添加到链表的内部就行了;还有就是原来的Node本身就是一个TreeBin,那么我们通过调用TreeBin内部的方法来添加一个Node,至于红黑树的平衡性,让TreeBin自己来维持,我们外部不需要关注。
添加完成之后,还需要一个操作,那就是如果当前链表达到了阈值,需要将链表变为红黑树。这一步的解释在后面在详细的说。
8.更新baseCount
在putVal方法的循环执行完毕之后,一个Node肯定放入进去了,此时就需要调用addCount方法来更新baseCount变量:
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//更新baseCount
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
//检测是否需要扩容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
addCount方法里面分为主要两个操作:1.更新baseCount;2.检测是否扩容。这里由于在讨论更新baseCount,所以不对扩容操作进行详细的解释,之后会做详细的解释。
更新baseCount的操作分成了两步:1.尝试更新baseCount变量;2.如果更新失败,或者counterCells为null会调用fullAddCount方法进行循环更新。
我们来看看fullAddCount的代码:
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0) {
// 如果counterCells数组对应位置上为null,创建一个cell,
//放在这个位置上。
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//如果对应位置上不为null,尝试更新value值
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
else if (!collide)
collide = true;
//如果对应不为null,并且更新失败,表示此时counterCells数组的容量过小,
//此时需要扩容。
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = ThreadLocalRandom.advanceProbe(h);
}
//初始化counterCells数组
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}
这个方法的代码比较长,理解起来可能麻烦,我来说明整个方法的作用和执行的过程。
在解释这个方法之前,先解释一下baseCount和counterCells含义。两个都是用记录元素个数,只是记录的时机不同的。当要更新元素个数时,优先更新baseCount,如果baseCount更新成功的话,表示更新元素个数的操作已经完成了;如果更新失败的话,此时会考虑更新counterCells数组中某一个(随机的)cell的value值。因此,map的元素个数 = baseCount + 所有的cell的value值。
在调用fullAddCount方法之前,在addCount方法里面进行了两次尝试:1.尝试更新baseCount;2.尝试更新counterCells数组随机位置上的一个cell的value。如果这两次尝试都失败的话,则需要调用fullAddCount来保证元素个数正确的更新。
而这个fullAddCount方法的作用就是更新cell的value值。
整个fullAddCount方法的执行步骤:
1.如果counterCells为null,先初始化counterCells数组,默认大小为0。
2.如果counterCells不为null,根据产生的随机,通过(n - 1) & h找到一个位置,如果这个位置为null的话,创建一个cell对象放在这个位置上面。
3.如果这个位置上面不为null,首先会尝试更新这个cell的value值,如果更新成功的话,表示更新操作完成,否则执行第4步。
4.如果第3步的更新操作失败,表示此时counterCells数组的容量可能不足以应付这么多的线程了。所以此时counterCells数组需要扩容。
9.扩容操作
当我们put一个元素之后,如果此时元素已经达到了扩容的阈值了,就需要扩容。我们先来看看addCount方里面的扩容部分:
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
//表示已经有线程在进行扩容操作
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//当前线程是唯一的或是第一个发起扩容的线程 此时nextTable=null
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
addcount方法扩容部分有两种情况:1. sc<0情况下,表示当前已经有线程在进行扩容操作了,此时当前这个线程需要参与帮助扩容的任务中来;2. 另一个情况,当前线程只有一个线程准备扩容操作。第一种情况为什么不调用helperTransfer方法来表示帮助扩容的含义,这个原因,我也不是很懂,但是比较了第一种情况的代码与helperTransfer方法的代码,两者其实比较类似。
现在我们来看看transfer方法究竟为我们做了什么。由于transfer方法代码过长,所以这里不完全展出来,这里使用分段形式来解释。
首先,如果是第一个线程调用transfer方法进行扩容操作,那么nextTable肯定为null。所以transfer方法的第一步就是创建新数组,新数组的容量是旧数组的两倍:
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
扩容之后,此时需要将原来的数组进行复制操作了。这个过程支持多线程操作。在看这个过程之前,我们先来看看几个变量的含义。
变量名 | 类型 | 描述 |
---|---|---|
fwd | ForwardingNode | 用来作为占位符,表示当前位置已经被处理过 |
advance | boolean | advance为true,表示当前这个节点已经被处理,这个与fwd区别在于,advance为false表示当前节点(i位置)可以被处理 |
finishing | boolean | finishing为true,表示当前扩容操作已经操作完毕 |
接下来,我们来分析一下transfer方法的执行过程。大体的流程是,遍历这个原来的桶数组,然后复制到新数组里面去。
首先,必须找到一个可以被处理的Node节点,这个节点可以为null,也可以不为null,但是不能为fwd对象,因为如果是fwd对象,表示当前已经被处理过,没必要再去处理。
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
上面的代码中,只要advance为false,就退出while循环,表示当前尝试着处理这个位置上的Node。这里说的是尝试着处理,意思就是,可能不会被处理,比如说,已经有现在在处理了;或者这个位置已经被处理,已经为fwd了。
首先,如果这个位置上为null的话,那么直接将这个位置设置成fwd。
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
如果当前这个节点已经被处理了,直接将advance设置为true,进行下一次位置的选择。
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
其他情况下,表示可以处理这个节点,这里处理的意思,是可以将这个位置上的所有节点拷贝到新数组里面去。这里分为两种情况,我们来一一的分析。
首先,第一种情况就是节点本身为一个链表结构。
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//将原来链表上的一部分就放在原位置
setTabAt(nextTab, i, ln);
//将原来链表上的另一部分放在 i + n的位置上
setTabAt(nextTab, i + n, hn);
//处理完毕,将这个位置上设置为fwd,后面的线程看到之后,知道这个节点已经被处理了
setTabAt(tab, i, fwd);
advance = true;
}
整个过程是非常简单,就是将原来的链表分为了两部分,一部分在原来的 i 位置上,一部分在 i+ n的位置上。这里涉及到了喜闻乐见的反转链表,这里不对反转链表做解释,后面会详细的解释反转链表的原因,到时候会回来详细的解释这段代码。
现在来看看复制操作的第二种类型。如果当前位置上的Node为TreeBin类型,表示为红黑树,此时要做的操作跟链表的操作也是非常类似,将树分为两个部分,一个部分在原来的i位置上,另一个部分在i + n的位置上。
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//如果此时节点数量小于阈值,那么将红黑树变为链表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
最后,如果所有的节点被赋值完毕之后,表示扩容操作已经完成了,此时finishing为true。
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
//将nextTable置为null,以便下次扩容
nextTable = null;
table = nextTab;
//将sizeCtl的值调整为长度的0.75
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//sizeCtl减一,表示当前有个线程参与扩容
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
10.反转链表的原因
楼主的思路来自于深入分析ConcurrentHashMap1.8的扩容实现,如果大家可以参考一下。刚刚,我们对transfer方法的复制操作有了一个大概的理解,这里将对其详细分析。
在解释这段代码之前,我们先对这段代码里面几个变量有一个初步的理解。
变量名 | 类型 | 描述 |
---|---|---|
runBit | int | 位置为每个节点的hash值与n进行与操作,经过第一次循环,runBit记录的是最后一个hash值变化的Node的hash值。可能这里拗口,待会举一个详细例子。 |
lastRun | Node | 默认值为当前位置第一个Node,经过第一次循环,lastRun记录的是最后一个hash值变化的Node。 |
ln | Node | 默认值为null,是放在新数组 i 位置上的链表头结点。 |
hn | Node | 默认值为null,是放在新数组 i + n 位置上的链表头结点 |
在上面的表格中,提到最后一个hash值变化的Node,这句话可能比较拗口,这里举一个简单的例子来表达意思。
上图中是一个链表的模型,其中,我将这个链表中的节点分为两类:1.一类是节点的hash值相同的;2.第二类是跟第一类的hash值不同的。所以,从这个链表中我们可以得到,lastRun为节点6,runBit也是节点6与n进行与操作得到的值。为什么是节点6呢?因为在6之后,节点类型没有再变了。
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//将原来链表上的一部分就放在原位置
setTabAt(nextTab, i, ln);
//将原来链表上的另一部分放在 i + n的位置上
setTabAt(nextTab, i + n, hn);
//处理完毕,将这个位置上设置为fwd,后面的线程看到之后,知道这个节点已经被处理了
setTabAt(tab, i, fwd);
advance = true;
}
结合源码来分析,runBit默认为节点1的hash和n与操作的值。后面依次记录与上一次runBit的不同值,最后通过runBit == 0将所有的节点分为了两类。
现在我们结合上面的图做一个假设。我们假设紫色节点的hash值&n为0,那么蓝色节点的hash值&n就不为0。经过第一次循环,得出 ln = lastRun = 节点6,hn = null。
此时我们来分析第二次循环,先来看看紫色节点构成ln链表的过程。
我看到ln链表只是部分反转了,默认部分是没有反转了。其中得到的ln链表放在新数组的i位置上。
然后我们再来看看hn链表的构成过程。
我们发现hn链表是完全反转的。从而,我们得出一个结论,哪个链表默认为null,那个链表就是完全反转的。
最终,我们终于知道了,一个反转链表得出的原因,其实不是故意为之,而是加快复制操作速度,因为lastRun之后的节点进行复制操作不需要创建新节点。
11.get操作
前面详细的说了put操作和扩容操作,这里简单的分析一下get操作,get操作相比于前面的操作就非常的简单。先来看看源码:
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//匹配成功
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//当节点为TreeBin或者ForwardingNode
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//遍历链表
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
整个get操作分为三种情况:1.table数组指定位置的第一个节点就匹配成功,直接返回;2.table数组指定位置上的hash值小于0,此时当前位置可能已经被其他在扩容时处理过,或者当前位置的Node为一个TreeBin,不管是那种类型的Node,调用的都是find方法来获取Node节点;3.其余情况下,直接遍历链表查找。
12.size操作
size操作里面,主要有两个方法提供,一个是传统的size方法,一个是ConcurrentHashMap比较提倡的mappingCount方法。我们先来看看size方法:
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
size方法里面调用sumCount方法,sumCount方法才是真正计算元素个数的方法。我们来看看sumCount方法:
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
可能大家对这个方法计算方式不太好理解,其实这种计算方式取决于前面的设计方式。在前面我说到过,map的元素个数 = baseCount + 所有的cell的value值。这个结论是因为多线程更新个数时,如果只在baseCount里面记录,一个原因是多线程更新可能会出错,另一个原因是竞争性太大了。所以,设计理念就变成了,当baseCount更新失败时,表示baseCount变量当前忙碌,可以将这个增量添加到一个不忙的cell里面去,这样竞争性就降低了。
我们再来看看mappingCount方法:
public long mappingCount() {
long n = sumCount();
return (n < 0L) ? 0L : n; // ignore transient negative values
}
达到的效果感觉跟size都差不多,只不过是一个返回int,一个返回long而已。