上一篇文章介绍了基本的常量、方法以及构造方法,这一篇开始分析核心方法put(),put()中主要完成的是对元素的新增或覆盖,结束之后进入addCount()去累加修改次数以及判断是否需要扩容。
一、put()
public V put(K key, V value) {
return putVal(key, value, false);
}
/**
* onlyIfAbsent: true -> 仅在不存在时添加,false -> 不存在添加,存在则覆盖
*/
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 和HashMap不一样,这里是不允许键或值为null的
if (key == null || value == null) throw new NullPointerException();
// 通过扰动函数计算出hash
int hash = spread(key.hashCode());
// 如果桶位中是链表则表示遍历节点时的下一个节点的下标
// 如果桶位中是红黑树则为固定值2
int binCount = 0;
/**
* 自旋
* tab:当前table
*/
for (Node<K,V>[] tab = table;;) {
/**
* f:key对应的桶位上的第一个Node
* n:table数组长度
* i:key对应的数组下标
* fh:f的hash值
*/
Node<K,V> f; int n, i, fh;
// 检查table是否初始化
if (tab == null || (n = tab.length) == 0)
// 初始化
tab = initTable();
// tabAt:获得table数组i下标元素的最新值,为null则直接cas存放元素
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 操作成功直接退出,失败则继续自旋
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break;
}
// MOVED = -1,判断f是否为forwarding node
// 如果f为forwarding node表示正在扩容,并且当前桶位转移完毕
else if ((fh = f.hash) == MOVED)
// 协助扩容
tab = helpTransfer(tab, f);
else {
// 如果是覆盖操作,表示覆盖之前的元素值
V oldVal = null;
// 锁住桶位中头节点
synchronized (f) {
// 检查锁住的元素(防止在加锁之前头节点发生变化)
if (tabAt(tab, i) == f) {
// f的hash >= 0表示f非红黑树节点
if (fh >= 0) {
// f的下一个节点下标
binCount = 1;
// e:表示当前元素
for (Node<K,V> e = f;; ++binCount) {
// e的key值
K ek;
// 判断key是否存在
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
// key已经存在,记录当前节点的val
oldVal = e.val;
// onlyIfAbsent为false时才覆盖value
if (!onlyIfAbsent)
e.val = value;
break;
}
// pred指向当前遍历的节点
Node<K,V> pred = e;
// e指向下一个节点,检查是否为null
if ((e = e.next) == null) {
// e为null表示整个链表中没有相应的key,所以直接从尾部插入
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// f为红黑树的代理节点TreeBin
else if (f instanceof TreeBin) {
// 如果红黑树中存在key,p表示对应的Node,否则为null
Node<K,V> p;
// binCount置为固定值2
binCount = 2;
// 走红黑树的插入逻辑
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
// key已经存在,记录key对应的value
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// binCount != 0表示桶位中是存储了Node的
if (binCount != 0) {
// binCount到达树化阈值时,进入treeifyBin尝试树化
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
// oldVal不为null表示桶位中存在对应的key
if (oldVal != null)
// 返回未覆盖前的value
return oldVal;
break;
}
}
}
// 核心方法,涉及到很多逻辑,下面分析
addCount(1L, binCount);
return null;
}
二、addCount()
addCount()前半部分是和LongAdder的add方法一样,都是利用cell数组去化整为零的累加方案,后半部分则是扩容逻辑,其中真正的扩容方法是在transfer()中,transfer()方法下一篇文章再来分析。
private final void addCount(long x, int check) {
/**
* 上半部分部分的代码和LongAdder的add方法基本是一样的,之前已经分析过了,这里不在赘述
* 没有看LongAdder源码的一定要先看一下
* as:并发累加的cell[]
* b:base变量
* s:累加之后的值,即map当前元素个数
*/
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 和LongAdder中的longAccumulate()一样
fullAddCount(x, uncontended);
// 之所以在这里返回而不走下面扩容逻辑是因为
// fullAddCount()比较耗时,这里就不让当前线程继续走下去了
return;
}
// addCount会被put和remove的逻辑中调用,remove中会传入-1,移除元素一定不会扩容
// put逻辑中传入的值一定是大于等于0,等于0表示桶位中没有元素,等于1表示key已存在并且
// 是桶位的第一个元素,只做替换操作也一定不会出发扩容所以直接返回
if (check <= 1)
return;
// 统计base和cell[]中累加的值
s = sumCount();
}
// 扩容逻辑
if (check >= 0) {
/**
* tab:旧table数组
* nt:新table数组
* n:旧数组长度
* sc:扩容阈值
*/
Node<K,V>[] tab, nt; int n, sc;
/**
* s >= (long)(sc = sizeCtl):s大于等于扩容阈值,true表示需要扩容
* (tab = table) != null和(n = tab.length) < MAXIMUM_CAPACITY)
* 一般不会出现,这里是防止某些极小概率的异常情况
*/
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// 前置条件:table需要扩容
// 扩容戳,只有是对同样长度的数组扩容才能得到相同的扩容戳
// 比如两个线程都要去扩容16-->32,那么就会计算出相同的扩容戳
int rs = resizeStamp(n);
// 前面提过了,sizeCtl < 0表示正在初始化或者扩容,能走到这里table一定已经初始化了
if (sc < 0) {
/**
* 前置条件:ConcurrentHashMap正在扩容
* case1:右移16位计算出sizeCtl的扩容戳,判断是否与当前线程相等,如果不相等,
* 说明当前线程扩容的长度和正在扩容的长度不一致
* case2:sc == rs + 1是个bug,作者真正想写的是
* sc == (rs << 16) + 1 表示扩容完毕
* case3:sc == rs + MAX_RESIZERS也是bug,实际想要写的是
* sc == (rs << 16) + MAX_RESIZERS 表示参与扩容的线程已满
* case4:新数组为null表示扩容完毕
* case5:transferIndex <= 0表示所有需要扩容的桶位已经被分配完了
*/
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
// 满足任一条件都不需要扩容了,所以直接退出循环
break;
// 尝试修改sizeCtl,在其低16位上累加一个1表示新增了一个协助扩容的线程
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
// 真正的扩容方法
transfer(tab, nt);
}
// table还未扩容,需要当前线程来进行扩容
// 这里通过cas方式来尝试修改 sizeCtl => 高16位:扩容戳,低16位-(1 + 1)
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
三、总结
和HashMap的put()相比首先ConcurrentHashMap不允许键或值为null。同时为了保证线程安全,新增或覆盖元素时用的是cas或者synchronized锁住头节点操作。最后一点则是ConcurrentHashMap添加完元素之后先记录修改次数,然后执行扩容的逻辑,和HashMap顺序不同。