ConcurrentHashMap
JDK1.8抛弃了原有的 Segment 分段锁,而采用了 CAS + synchronized
来保证并发安全性。
创建ConcurrentHashMap对象,使用一个参数的构造函数时做了什么?
使用put方法干了哪些事情?
怎样扩容的?
且带着疑问,根据源码一步步分析。
在分析之前,先明确几个重要的成员变量。
- table:数组,默认为null,用来存储Node节点数据,扩容时大小总是2的幂次方
- nextTable:默认为null,扩容时新生成的数组,其大小为原数组的两倍
- sizeCtl:默认为0,用来控制数组的初始化和扩容操作。负数表示有线程正在扩容。
ConcurrentHashMap一个参数构造器的过程
//带1个参数构造器
public ConcurrentHashMap(int initialCapacity) {
// 小于抛出异常
if (initialCapacity < 0)
throw new IllegalArgumentException();
// 对于给定的预期容量作出合理规划。注:MAXIMUM_CAPACITY为2的30次方,MAXIMUM_CAPACITY >>> 1为2的30次幂为536870912,一般initialCapacity不会设置这么大的
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
//tableSizeFor方法可以转换为sizeCtl = 【 (1.5 * initialCapacity + 1),然后向上取最近的 2 的 n 次方】
// 例如,initialCapacity为7,sizeCtl=cap=16
this.sizeCtl = cap;
}
put方法的过程
public V put(K key, V value) {
// put方法里直接去调用了putVal方法
return putVal(key, value,false);
}
putVal方法分析
final V putVal(K key, V value, boolean onlyIfAbsent) {
// key键和value值不能为 null
if (key == null || value == null) throw new NullPointerException();
// 计算hash值,将Key的hashCode值与其高16位作异或再按位与int的最大值从而保证最高位为0(从而保证最终结果为正整数)
// 通过spread函数,int hash = (key.hashCode() ^ (key.hashCode() >>> 16)) & HASH_BITS
// HASH_BITS=int型的最大值,即十六进制0x7fffffff,二进制 0111 1111 1111 1111 1111 1111 1111 1111
int hash = spread(key.hashCode());
// 局部变量,binCount默认是0,只有hash冲突了才会大于1.且他的大小是链表的长度(如果不是红黑数结构的话)。
int binCount = 0;
//循环,因为后面是CAS操作,可能会需要大量的重试
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果没数组为空,调用initTable方法初始化创建数组
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 找到下标,如果为空,采用CAS进行插入
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//如果没放成功,继续向下走,因为这肯定是出现了并发操作,所以去判断没放成功的理由.如果放成功了,那就结束循环
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果 hash 冲突了,且 hash 值为 -1,说明是 forwarding node 对象(这是一个占位符对象,保存了扩容后的容器)
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);// 帮助数据迁移
else { // 这里就是数组已经有元素了,这时候就该挂链表或者挂树了
V oldVal = null;
// 获取头节点的监视器锁
synchronized (f) {
if (tabAt(tab, i) == f) {
// 头节点的hash值,大于0表示这下面有点东西
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) { //for循环,表示遍历链表,循环一次后binCount加1
K ek;
// 如果发现了"相等"的 key,判断是否要进行值覆盖
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 到最后了没重复的key,就把新值向后面挂
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 如果是个树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// 插节点,调用TreeBin的putTreeVal方法
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 判断链表的长度,如果大于8,然后转树
if (binCount >= TREEIFY_THRESHOLD)
// 这里要注意一个地方!!!!! --不是说像HashMap那样转树就没事了
// 这里涉及到一个核心思路,CurrentHashMap做了优化,这里如果数组长度小于64,它会先扩容,扩容代表什么含义
// -- 原来的链表会被1分为2 分别散落在不同的节点上
treeifyBin(tab, i);
// 如果key已存在,有原值,返回原值
if (oldVal != null)
return oldVal;
break; // 结束外层死循环
}
}
}
// 元素计数加1,根据binCount来检验是否需要检查和扩容
addCount(1L, binCount);
return null;
}
initTable方法进行数组初始化
// 在上面的putVal源码里,当数组为空或长度为0的时候,需初始化,调用了initTable()方法
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 需要注意的是,当整形的变量sc(即sizeCtl)小于0,那么说明有其他线程在在扩容,就调用线程的yield()方法
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// 使用sun.misc.Unsafe的compareAndSwapInt方法设置当前对象的sizeCtl为-1,设置成功后,初始化数组,默认容量 DEFAULT_CAPACITY 为16
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 变量sc的值为(n-(n>>>2)),n-(n>>>2)=n-(n/2^2)=n-n/4=3/4*n=0.75*n,默认n开始为16,16*0.75=12
// 可知,负载因子为0.75
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
链表转树,treeifyBin方法分析
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
//MIN_TREEIFY_CAPACITY为64
// 虽然进入到转树方法,如果数组长度小于64,那么先扩容
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1); // 扩容,下面详细说
// 确定头节点没问题开始加锁,转树
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
// 遍历链表,生成一棵树
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
// 把数据放到树中
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
tryPresize方法进行扩容
private final void tryPresize(int size) {
// 如果大小大于等于MAXIMUM_CAPACITY(2的30次幂)的一半,那么直接扩容为MAXIMUM_CAPACITY,否则扩容为1.5的size加1再向上获取最近的2的整数次幂
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
// 如果sizeCtl大于等于0
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
// 数组table为空
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
// CAS修改SIZECTL为-1,表示数组table正在进行初始化
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// 确认其他线程没有对数组table修改
try {
//
if (table == tab) {
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
// sc=n-n/4=0.75*n
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
// 数组table不为空,如果扩容大小没有达到阈值,或者超过最大容量2的30次方,跳出while循环
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
// 生成戳
int rs = resizeStamp(n);
if (sc < 0) {// 有线程在进行扩容
Node<K,V>[] nt;
/**1.第一个判断 sc右移RESIZE_STAMP_SHIFT(16)位,也就是比较高ESIZE_STAMP_BITS(16)位生成戳和rs是否相等
* 相等则代表是同一个n,是在同一节点下进行的扩容,
* 2.第二个和第三个判断 判断当前帮助扩容线程数是否已达到MAX_RESIZERS(2的16次方-1=65535)最大扩容线程数
* 3.第四个和第五个判断 为了确保transfer()方法初始化完毕
*/
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
// 跳出循环
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
// 移动和拷贝节点到新数组
transfer(tab, nt);
}
/**没有线程在进行扩容,那么CAS修改sizeCtl值,作为扩容的发起,rs左移RESIZE_STAMP_SHIFT(16)位+2
* 此时sizeCtl高RESIZE_STAMP_BITS(16)位为生成戳,低RESIZE_STAMP_SHIFT(16)位为扩容线程数
*/
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
// 移动和拷贝节点到新数组
transfer(tab, null);
}
}
}
移动和拷贝节点到新数组,transfer函数
//该方法通过全局的transferIndex来控制每个线程的迁移任务
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
// n为旧tab的长度,stride为步长(就是每个线程迁移的节点数)
int n = tab.length, stride;
// 单核步长为1,多核为(n>>>3)/ NCPU,最小值为16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 新的 table 尚未初始化
if (nextTab == null) { // initiating
try {
// 扩容2倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
// 更新
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
// 扩容失败, sizeCtl 使用 int 最大值
sizeCtl = Integer.MAX_VALUE;
return;
}
// nextTable为全局属性
nextTable = nextTab;
// 更新转移下标,就是老的tab的length
transferIndex = n;
}
int nextn = nextTab.length;// 新 tab 的 length
// 创建一个 fwd 节点,用于占位。当别的线程发现这个槽位中是 fwd 类型的节点,则跳过这个节点
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 首次推进为 true,如果等于 true,说明需要再次推进一个下标(i--),反之,如果是 false,那么就不能推进下标,需要将当前的下标处理完毕才能继续推进
boolean advance = true;
// 完成状态,如果是 true,就结束此方法
boolean finishing = false; // to ensure sweep before committing nextTab
// 死循环,i 表示下标,bound 表示当前线程可以处理的当前桶区间最小下标
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 如果当前线程可以向后推进;这个循环就是控制 i 递减。同时,每个线程都会进入这里取得自己需要转移的桶的区间
while (advance) {
int nextIndex, nextBound;
// 对 i 减一,判断是否大于等于 bound (正常情况下,如果大于 bound 不成立,说明该线程上次领取的任务已经完成了。那么,需要在下面继续领取任务)
// 如果对 i 减一大于等于 bound(还需要继续做任务),或者完成了,修改推进状态为 false,不能推进了。任务成功后修改推进状态为 true
// 通常,第一次进入循环,i-- 这个判断会无法通过,从而走下面的 nextIndex 赋值操作(获取最新的转移下标)
// 其余情况都是:如果可以推进,将 i 减一,然后修改成不可推进。如果 i 对应的桶处理成功了,改成可以推进
if (--i >= bound || finishing)
// 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进 这里的目的是:
// 1. 当一个线程进入时,会选取最新的转移下标。
// 2. 当一个线程处理完自己的区间时,如果还有剩余区间的没有别的线程处理。再次获取区间。
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
// 如果小于等于0,说明没有区间了 ,i 改成 -1,推进状态变成 false,不再推进,表示,扩容结束了,当前线程可以退出了
// 这个 -1 会在下面的 if 块里判断,从而进入完成状态判断
i = -1;
advance = false;// 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// 这个值就是当前线程可以处理的最小当前区间最小下标
bound = nextBound;
// 初次对i 赋值,这个就是当前线程可以处理的当前区间的最大下标
i = nextIndex - 1;
// 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进,这样对导致漏掉某个桶
// 下面的 if (tabAt(tab, i) == f) 判断会出现这样的情况
advance = false;
}
}
// 如果 i 小于0 (不在 tab 下标内,按照上面的判断,领取最后一段区间的线程扩容结束)
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {// 如果完成了扩容
nextTable = null;// 删除成员变量
table = nextTab;// 更新 table
sizeCtl = (n << 1) - (n >>> 1); // 更新阈值
return;
}
// 尝试将 sc -1. 表示这个线程结束帮助扩容了,将 sc 的低 16 位减一
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 如果 sc - 2 不等于标识符左移 16 位。如果他们相等了,说明没有线程在帮助他们扩容了。也就是说,扩容结束了。
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;// 不相等,说明没结束,当前线程结束方法
// 如果相等,扩容结束了,更新 finising 变量
finishing = advance = true;
i = n; // recheck before commit // 再次循环检查一下整张表
}
}
// 获取老 tab i 下标位置的变量,如果是 null,就使用 fwd 占位。
else if ((f = tabAt(tab, i)) == null)
// 如果成功写入 fwd 占位,再次推进一个下标
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)// 如果不是 null 且 hash 值是 MOVED,MOVED=-1
// 说明别的线程已经处理过了,再次推进一个下标
advance = true; // already processed
else {// 到这里,说明这个位置有实际值了,且不是占位符节点。对这个节点上锁。为什么上锁,防止 putVal 的时候向链表插入数据
synchronized (f) {
// 判断 i 下标处的桶节点是否和 f 相同
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;// low, height 高位桶,低位桶
if (fh >= 0) {
// 对老长度进行与运算(第一个操作数的的第n位于第二个操作数的第n位如果都是1,那么结果的第n为也为1,否则0)
// 由于 Map 的长度都是 2 的次方(000001000 这类的数字),那么取于 length 只有 2 种结果,一种是 0,一种是1
// 如果是结果是0 ,Doug Lea 将其放在低位,反之放在高位,目的是将链表重新 hash,放到对应的位置上,让新的取于算法能够击中他
int runBit = fh & n;
// 尾节点,且和头节点的 hash 值取于不相等
Node<K,V> lastRun = f;
// 遍历这个桶 接下来是常规的设置操作,我们先略过
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 如果树的节点数小于等于 6,那么转成链表,反之,创建一个新的树
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
// 低位树
setTabAt(nextTab, i, ln);
// 高位树
setTabAt(nextTab, i + n, hn);
// 旧的设置成占位符
setTabAt(tab, i, fwd);
// 继续向后推进
advance = true;
}
}
}
}
}
}
总结
- 使用了 CAS 加 synchronized 来保证了 put 操作并发时的危险(特别是链表)
- 采用了 数组+链表+红黑树 的数据结构
- 单线程初始化,多线程协同扩容
FAQ:
- 什么时候触发扩容?
- 链表转换为红黑树时(链表节点个数达到8个会转换为树),数组长度小于64
- 数组中总节点数大于阈值(数组长度的0.75倍)
-
如何hash定位
h^(h>>>16)&0x7fffffff,先将hashCode的高16位和低16位异或运算,这个做目的是为了让hash值更加随机。和0x7fffffff(int的最大值)相与运算是为了得到正数,因为负数的hash有特殊用途,如-1表forwarding node(表示该位置正在扩容) -
扩容时,扩容后的容量是原先的几倍?单线程扩容吗?
2倍,多线程协同扩容,每个线程负责一块区域的复制迁移任务
转自 https://segmentfault.com/a/1190000037672616
思否的 fai1017 是本人