前言
1.8后的ConcurrentHashMap与之前有截然不同的设计,之前是分段锁的思想,通过采用分段锁Segment减少热点域来提高并发效率。1.8利用CAS+Synchronized来保证并发更新的安全,底层采用数组+链表+红黑树的存储结构。
在此再一次膜拜Doug Lea大神,高山仰止。1.8的ConcurrentHashMap有6313行代码,之前大概是1000多行。
这篇文章也只是概括了部分功能。
相关概念
table
所有数据都被存放在table数组中,大小是2的整数次幂,存储的元素分为三种类型
- TreeBin 用于包装红黑树结构的结点类型 ,它继承了Node,代表它也是个节点,hash为-2,内部有个root变量指向红黑树的头节点,封装了众多红黑树方法
- ForwardingNode 扩容时存放的结点类型,并发扩容的实现关键之一 ,是一个标记,代表此处不需要扩容。
- Node 普通结点类型
Node
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
//辅助map.get()方法
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
//不支持set方法改变value值
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
value和next都用volatile修饰,保证并发的可见性
ForwardingNode
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
ForwardingNode作用在扩容期间,它是一个标记
nextTable
扩容时新生成的数组,其大小为原数组的两倍
sizeCtl
/**
* Table initialization and resizing control. When negative, the
* table is being initialized or resized: -1 for initialization,
* else -(1 + the number of active resizing threads). Otherwise,
* when table is null, holds the initial table size to use upon
* creation, or 0 for default. After initialization, holds the
* next element count value upon which to resize the table.
*/
private transient volatile int sizeCtl;
用于table数组初始化及扩容控制,下面来看看它是如何控制的:
接下来的思路是沿着sizeCtl来跟踪源码,但是很多方法具有多种功能,比如addCount...会牵扯很多其它的概念,这里就把他们先剔除出去,先沿着一条简单的线来解读
sizeCtl在初始化与扩容中的作用
1,初始化:ConcurrentHashMap有五个构造器,不考虑构造时指定集合的,其他四个都没有在初始化期间创建table数组对象,而是将这一操作下放到第一次调用put插入键值对时。sizeCtl决定了table数组的大小,无参构造器则sizeCtl为默认值0,他会直接影响到table数组的大小,为16;传入了初始值大小,经过tableSizeFor将初始值改为2的n次幂
final V putVal(K key, V value, boolean onlyIfAbsent) {
.......省略
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
......省略
initTable会被调用
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
首先sc = sizeCtl,将sizeCtl值保存起来,之后当一个线程cas设置sizeCtl值为-1成功,之后的线程都将被拒绝,通过执行Thread.yield()。也就是说只允许一个线程执行初始化table 数组操作。sc == 0则table大小为16,否则就为sizeCtl大小的值。数组创建完成后sizeCtl = n - (n >>> 2),相当于原先值的0.75,这之后sizeCtl代表阀值。
2,接下来看看它在扩容上的控制: 扩容-是transfer方法,在addCount里被调用,而addCount在putVal最后被调用,所以想要理解sizeCtl的变换得先从put操作开始。(这里只是按照这条线来说,这些方法在很多地方都有调用)
在分析put前先来看看三个方法:tabAt,casTabAt,setTabAt
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
ABASE表示table中首个元素的内存偏移地址,所以(long)i << ASHIFT) + ABASE得到table[i]的内存偏移地址:
- tabAt利用Unsafe来从内存直接获取tab数组中i位置的节点。为什么不用table[i]来直接获取?虽然table是volatile修饰的,但并不能保证数组中的元素是最新的,所以直接从内存上拿到最新的值,
- casTabAt利用cas原子操作来设置i位置的头节点,所以旧值一共是null才能设置成功。
- setTabAt利用Unsafe直接更改内存上tab[i]处节点。
为什么有两个写操作?下面是注释
* Note that calls to setTabAt always occur within locked regions,
* and so in principle require only release ordering, not
* full volatile semantics, but are currently coded as volatile
* writes to be conservative.
意思是:setTabAt 调用是在拥有锁的状态下,也就是被synchronized保护起来的状态下,所以没有原子性的考虑。
put
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode()); //计算出hash
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
(直接在代码里注释显示不清,所以下面就是在分析代码逻辑)
- key与value都不能为null
- spread计算key的hash值。(h ^ (h >>> 16)) & HASH_BITS;//0x7fffffff;保证了hash >= 0.
- 循环+cas,循环cas是Java相较于synchronized的另一种锁实现,之前文章介绍过。
- 如果tab == null执行initTable操作,上面介绍过。
- 利用tabAt取出i处头节点赋给f,若为null则利用casTabAt设置头节点。
- 若f的hash == MOVED,说明有线程在i处正在执行扩容操作,执行helpTransfer,该线程帮助执行扩容任务,之后再新数组中再添加值。
- 以上情况都不是,利用synchronized 锁住头节点f确保线程安全,区分是链表还是树,执行插入操作;如果是链表,那么在遍历过程中++binCount,最后如果binCount > 8,调用treeifyBin树化
- 在成功插入了一个新的元素后,addCount会被调用,这个方法一共做了两件事,增加个数,扩容。
addCount
两个功能:增加个数,检测是否进行扩容。这里主要分析扩容
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
CounterCell,baseCount与size等计算map元素个数的方法有关,之后介绍;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
.....省略.........................................
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
一开始说过这段源码的解析是沿着sizeCtl这条线,来看看它的作用的,所以这里省略了addCount的其他功能。
前面sizeCtl在经过initTable后,代表的是阀值,当元素个数>=sizeCtl后进行扩容,但是不要忘了多线程环境,假设一个场景:sizeCtl此时代表阀值,多线程下其中一个线程进入了while循环里,调用resizeStamp方法(先不管该方法的意义),之后由于sizeCtl > 0,执行CAS将sizeCtl设置为(rs << RESIZE_STAMP_SHIFT) + 2,之后调用transfer(tab, null)扩容,这时nextTable一定为null,所以transfer方法传null。那么现在就已经有了一个线程在进行扩容操作,那么对于其他进入循环里的线程该如何处理?它们在条件允许下会帮助进行扩容操作,每增加一个线程就cas将sizeCtl +1 。
这里的条件允许指的是扩容未完成即nextTable!=null;数组未分配完即transferIndex>0(看下面扩容部分);sizeCtl未发生变化因为扩容结束或有其他线程抢先该值都会发生变化;扩容线程未超过允许值即MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
先来看看resizeStamp方法
private static int RESIZE_STAMP_BITS = 16;
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
numberOfLeadingZeros返回数的二进制中左侧0的个数,比如传入2左侧有30各0,就返回30;传入32,左侧有26个0就返回26;
resizeStamp:比如传入16,返回数的二进制是1000000000011011;
回到上面假设场景:第一个线程调用resizeStamp后得到数rs,之后cas将rs左移16位再加2。以n =16为例,sizeCtl变为1000000000011011 0000000000000010。此时sizeCtl < 0,后16位的大小假设为N,则代表目前有N-1个线程在执行扩容操作。
在第一个线程将sizeCtl改变后,sizeCtl<0,其他线程会进入if (sc < 0) {}代码块中,判断能否执行扩容操作,就是上面说的五个条件判断。这里可以看到sizeCtl对扩容操作的影响。
上面的分析可以看到sizeCtl在并发扩容期间起到的重要作用
总结一下sizeCtl的变化
table 初始化:
1,根据你调用的构造函数的不同,比如无参则sizeCtl = 0,initTable中将数组初始化为16;
若传了大小,则先经tableSizeFor改变大小确保为2的n次幂,之后赋给sizeCtl,
initTable中将数组初始化为sizeCtl大小
2,=-1 在初始化数组期间,即initTable里为了保证只有一个线程能够初始化table数组,
线程会利用cas将sizeCtl改为-1,之后的线程检测到sizeCtl< 0会退回到就绪状态
3,数组初始化完成后sizeCtl变为为阀值,大小为0.75倍数组大小
扩容:
第一个执行扩容的线程会将sizeCtl变为< 0,扩容期间sizeCtl低16位数大小假设为N,
则代表有N-1个线程在执行扩容操作。
下面的源码分析可以看出,很多方法会判断sizeCtl的正负,<0则代表正在扩容,>0则代表阀值
在上面以sizeCtl为线的分析中,出现很多方法,还有一些方法的功能没有分析全,下面来分析分析它们
扩容
transfer
扩容涉及到两个操作:1,新建新数组nextTable。2,将原数组元素移到新数组。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
创建nextTab数组失败,sizeCtl 此时为负值,重新赋值,这样其他线程便会来初始化新数组,
可去查看addCount帮助理解
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
.........
如果nextTab == null,新建一个新数组大小为原数组的2倍。多线程下应该只允许一个线程创建nextTab数组,那么如何实现?在前面提到的addCount方法里实现,通过cas改变sizeCtl值,其他线程在条件允许下会调用transfer(tab, nt),nt为创建后的nextTab数组,这样就实现了多线程并行扩容
int nextn = nextTab.length;
之前介绍过,它就是个标记节点,标识
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
下面for循环里面的while循环是为了配合CAS操作,即自旋CAS乐观锁形式,
意思是当该线程设置transferIndex失败,就在尝试一次,但是问题随之而来
如何控制什么时候结束?这就是advance的作用
boolean advance = true;
boolean finishing = false; 代表旧数组元素是否已经全部移到新数组
i就是该线程在属于自己的stride区里移动的指针;bound就是该区域的边界(包含边界)
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
首先将i减一,再与bound比较,意思是如果大于bound则代表线程还没有处理完自己的stride区域
finishing为true,代表当前线程是唯一的扩容线程了,其他的扩容线程已经结束,
该最后留下的线程会执行table重赋值操作,逻辑在第二块代码中
if (--i >= bound || finishing)
advance = false;
transferIndex就是前一个stride区域的边界值(被包含在前一块区域里),
它为0代表数组中没有剩下的空间需要你操作了
else if ((nextIndex = transferIndex) <= 0) {
i = -1; i设为-1,为了进入第二块代码,在哪里检测是否完成扩容
advance = false;
}
运行到这里则代表数组中还有未分配的位置,那么就执行CAS将transferIndex重新赋值,
得到该区域的边界bound与指针i
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
这里我的理解是:
首先这里的for循环意味着一个线程它将一个位置上的链表或树移到新数组后,又会再次循环移动另一位置直到整个数组全部完成。for循环整体代码分为5块,上面贴的是第一块。
这第一块的作用是什么?ConcurrentHashMap允许多线程并发扩容,那么位置如何选择?多线程下如何保证全部位置都转移完?这就是第一块代码的功能。在方法代码一开始计算了一个stride值,它根据你的cpu数与数组大小计算得来,最小值为16,它的作用是?每个线程扩容都会从数组中得到一块区域,这块区域的转移工作归该线程负责,完成就去重新申请下一块区域,这块区域大小就是stride。
比如第一个线程他的区域是[n-1,n-stride],第二个线程的区域[n-stride-1, n-2*stride]。
上述功能的具体实现在代码中已详细说明。
接下来看看第二块:能进入该块代码说明数组已经被线程们分配完了,等他们全部执行完自己的stride区域,扩容就完成了。
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
finishing为true,将nextTable 归零,table重新赋值,
sizeCtl重新变为阀值,为新数组大小的0.75
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1); 相当于0.75
return;
}
cas将sizeCtl减一,因为已经不需要该线程帮助扩容了,该线程可以结束了
此时的sizeCtl后16位大小等于当前扩容线程数+1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
这里如果相等代表此线程是最后一个扩容线程
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // 再次循环一次,会执行if (finishing){}里代码,之后退出,扩容结束
}
}
接下来第三块:
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
如果该位置为空,则用cas添加一个ForwardingNode节点,之前说过它是个标记,hash值为-1,它标志着该位置为不需要扩容操作,可能该位置原本就为null,或者已经执行了扩容操作;
ConcurrentHashMap运行中有些线程在插入,有些线程在执行扩容,如何避免相互影响?
- 在put中发现当前位置节点hash为-1,也就是ForwardingNode,那么put线程转而执行helpTransfer操作,帮助执行扩容,扩容完成后再插入;
- 具体插入与扩容代码是被synchronized保护的,而它们的锁都是头节点对象。获取不到锁的线程会等待,那么之后重新获取到锁还能够继续执行吗?我的意思是例如线程A要在该位置插入,由于该位置正在执行扩容,锁被占用,于是A阻塞,之后扩容执行完,A获取到了锁应该继续执行插入操作吗?不应该,这就是在synchronized 代码中再次校队头节点的必要性,即if (tabAt(tab, i) == f)判断。对于A线程他会在putval的for代码块内再循环一次,会检测到此时头节点的hash值已为-1,执行helpTransfer操作,帮助进行扩容,整体扩容完成后再插入节点。
来看看第四块:
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
表明该位置已经扩容过了,重赋值advance为true,确认下一位置i
最后第五块:
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
该位置的链表按runBit的值分为两类,
int runBit = fh & n;
Node<K,V> lastRun = f;
按runBit节点分成两类,每当遇到不同于之前节点值就标记为lastRun
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
........省略树的操作,逻辑与上面类似
}
}
}
这一块是真正执行扩容操作的一块,被synchronized保护,锁是头节点,获取锁后会再次对头节点进行校队,确保没有改变。
画了个图来说明上面代码的操作,假设有七个节点,1,2,6,7节点runBit为0.
在上面代码经历第一个for循环后,lastRun为6,其runBit为0,其后lastRun被赋值给ln,那么hn为null。
第二个for循环后,节点分类情况,2为ln,4为hn,在该过程中节点会被倒序
最后利用setTabAt将以ln为头的链表放到nextTab的i位,即仍放在原为不动,将以hn为头节点的链表放到nextTab的i+n位置,接着将tab的i位改为fwd,即标记节点ForwardingNode,告诉其他线程该位置已经扩容完成。最后将advance = true,继续循环去找下一位置扩容。
上面提过在putVal时如果发现该位置头节点hash为-1,即ForwardingNode节点,调用helpTransfer
helpTransfer
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length); 前面解析过
循环的这些判断条件为tue的话表明扩容未结束
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) { 扩容时sizeCtl一定小于0
resizeStamp的参数大小不变则值相等,sizeCtl的高16位就是resizeStamp的返回值左移16位
sc == rs + 1这个判断对应的情况没想明白
MAX_RESIZERS表示并发扩容所允许的最大线程数
transferIndex <= 0在上面分析过,扩容中transferIndex表示最近一个被分配的stride区域的边界,
<=0代表数组被分配完了
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
cas配合while循环构成自旋CAS,保证操作原子性。将sizeCtl+1,
sizeCtl此时的低16位为N=扩容线程数+1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
回到putVal逻辑,在扩容操作整体完成后,线程回到putVal中,再次循环将节点插入。
Size
之前在解析addCount时有部分代码被省略,省略的那部分代码与ConcurrentHashMap的size操作有关。对于ConcurrentHashMap来说table中的节点数量是个不确定的值,你没法停下所有正在执行各种操作的线程们来统计准确数字,所以最终得到的只是个估计值。下面来看看如何统计出来的。
首先来看看一些相关内部类与变量:
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
同步状态,利用CAS从0改为1代表获取锁
private transient volatile int cellsBusy;
初始大小为2,每次扩容翻倍,存储CounterCell对象,该对象有个value变量,用来存储个数
private transient volatile CounterCell[] counterCells;
它也被用来存储个数,在下面的源码分析中发现,在counterCells为空时会将个数累加在bseCount里;
若counterCells非空,在将个数存储进counterCells失败后,会将其累加进baseCount
private transient volatile long baseCount;
mappingCount与size
这两个方法都是统计个数的,不同在于size返回int,mappingCount返回long,文档注释建议使用mappingCount
public long mappingCount() {
long n = sumCount();
return (n < 0L) ? 0L : n; // ignore transient negative values
}
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
可以看出sumCount是关键。统计的方法就是遍历counterCells将每个位置存储的值相加再加上baseCount的值,和就是此时的个数估计值。
为了搞清一开始说的三个变量的用途,回到addCount里被我省略的部分:
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
counterCells
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
h大小初始化为0x9e3779b9;用来(n - 1) & h得到相应位置的CounterCell
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0) {
先初始化CounterCell,value就是参数x即个数;cellsBusy相当于AQS中的同步状态,
即可以看成是一个锁,获取锁操作就是cas将其从0改为1;被锁保护的代码中,
将CounterCell对象放进(n - 1) & h位置,再将cellsBusy = 0;成功就结束循环
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; 说明该位置已被其他线程放入了CounterCell
}
}
collide = false;
}
在addCount中如果CAS设置CounterCell中的value失败,uncontended为false,
即参数wasUncontended为false,所以它表示CAS失败。回到addCount在发现能够执行到cas设置
value这步说明前面三个判断都为false,即表明CounterCell非空,且数组该位置CounterCell不为null,
说明已经记录了值。
else if (!wasUncontended)
wasUncontended = true; 设为true,重新cas设置CounterCell的value
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
else if (counterCells != as || n >= NCPU)
collide = false; counterCells 已被更改或counterCells数组达到最大值;
else if (!collide)
collide = true;
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = ThreadLocalRandom.advanceProbe(h);
}
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}
来看看无限for循环中的处理逻辑:
- 如果counterCells数组不为空
- 利用(n - 1) & h取出该位置CounterCell,若为空,初始化CounterCell对象,CAS更改同步状态cellsBusy的值,可以看成是一种获取锁的操作,将对象放入该位置。break退出循环,方法结束。
- 若该位置为null,检测wasUncontended值,false说明addCount利用CAS更改该位置CounterCell的value值失败,那么对于失败该如何处理?再次CAS尝试更改该为值的value值?并不,看源码发现除了将wasUncontended 改为true后,最后会调用h = ThreadLocalRandom.advanceProbe(h)更改h的值,也就是将值x存储在其他位置,该位置可能为null或已存储值
- 该位置不为null,CAS更改value值,成功则退出循环,方法结束。
- 这里的判断成立则代表counterCells 已被更改或counterCells数组达到最大值,更改h值继续尝试。
- !collide判断,collide若为true代表上个位置非空;以我的理解collide的作用便是在对CounterCels数组扩容前(即下个else if判判断中的操作),再次更改h值再循环尝试一次,增加一次尝试的机会,可能扩容数组再次尝试操作对性能有影响吧。
- 前面的尝试都失败,那么就扩容counterCells 数组,再次尝试
- counterCells 数组未初始化,首先获取锁,初始化数组大小为2,将值x存储在h & 1位置;来看看它的判断条件(cellsBusy == 0 && counterCells == as &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)),先确认同步状态cellsBusy的值,再确认数组counterCells并未被其他线程更改,最后CAS来获取锁。看到问题了吗?如果在你确认前两个后在CAS之前它们被更改了,怎么办?在代码里看到在操作之前仍会再次判断counterCells == as,若被更改,则init为false,再次for循环处理。
- 上面都没成功,那就把x加到baseCount里,结束方法。
总结一下就是:cellsBusy是同步状态,将以下四个操作隔离开。1.给数组中某为空的位置存储CounterCell对象操作。2. 给数组中某非空的位置,累加其CounterCell的value值。3. CounterCell数组扩容。4. 初始化CounterCell数组。
注:从上面一路分析下来,一个问题ConcurrentHashMap如何保证线程安全性?
在put中,真正添加节点的操作是被synchronized保护的,还有扩容时移动节点的操作也是被synchronized保护起来的,它们时安全的。那对于共享变量的操作呢?虽然它们都用了volatile修饰,但很多操作会根据它们原先的值来决定新值,这就需要原子性的保证,所以采用自旋CAS确保这些共享变量值更改的安全性。
但是并不是自旋CAS原子改变同步状态就再配合synchronized就万事大吉了。比如再addCount里,一个线程CAS将sizeCtl更改,之后调用transfer,先初始化nextTable数组,但是这是多线程环境,情况远比想象的要复杂,比如,在你未初始化完nextTable之前,其他线程transfer由于检测到了sizeCtl<0,要来帮助扩容,若直接调用transfer(tab, nt);而此时nextTable还未初始化完成,那么那些线程就会去执行初始化nextTable操作,这是不允许的,所以在源码中会先对线程进行扩容条件判断,在判断合格后,仍会使用CAS尝试更改sizeCtl,为什么?因为很多状态的改变都会更改sizeCtl的值,比如扩容完成了,那么sizeCtl会被改变;比如有其他线程抢先,那么transferIndex与sizeCtl一定改变了,这些情况的发生可能在你条件判断合格之后,那么就不能让改线程轻易去调用transfer,所以才会有CAS对sizeCtl的再次判断。从这里也看出了sizeCtl的重要性,它与太多情况相关,不禁再次感慨大神的高山仰止。
所以我认为ConcurrentHashMap是以synchronized与CAS为基,每种操作都充分考虑到不同的情况下实现的线程安全
在ConcurrentHashMap中还有一部分是与红黑树的变化有关,如一开始提到的TreeBin,后面的文章再说
参考
深入浅出ConcurrentHashMap1.8
ConcurrentHashMap源码分析(JDK8版本)
更好地理解jdk1.8中ConcurrentHashMap实现机制