为了解决线程安全问题,同时又为了照顾效率的问题,java从1.5就有了ConcurrentHashMap。从而代替了HashTable。1.7的ConcurrentHashMap 是用了多个(默认16)ReentrantLock来解决线程安全问题的。1.8的ConcurrentHashMap 则用了cas来解决的。
如果想读ConcurrentHashMap源码的话,一定要具备以下知识点。
1.volatile关键字。
2.unsafe的cas方法
在阅读ConcurrentHashMap之前,如果读过HashMap (JDK1.8)的源码,也会有一定的帮助。
java.util.HashMap(JDK1.8)
下边只是一些比较简单的源码,有很多我也不是很清楚,欢迎大家沟通。
initTable
在map(在这里指ConcurrentHashMap,下边不在赘述)添加的时候,一定会初始化这个Node数组,但是因为多线程的原因,不能有多个线程同时去初始化,所以这里用while来监控,如果有别的线程正在扩容,则用yield方法去重新竞争cpu资源。
private final Node<K,V>[] initTable() {
//sizeCtl 负数代表正在进行初始化或扩容操作
//(-1代表正在初始化,-N 表示有N-1个线程正在进行扩容操作。0代表hash表还没有被初始化,正数表示初始化或下一次进行扩容的大小,这一点类似于扩容阈值的概念)
Node<K,V>[] tab; int sc;
//开始初始化
//因为table是volatile修饰的,如果table不为空了。直接跳出循环。
while ((tab = table) == null || tab.length == 0) {
//进入此判断,表示已经有线程正在初始化或者扩容了。
if ((sc = sizeCtl) < 0)
//线程从运行状态变为就绪状态。
Thread.yield(); // lost initialization race; just spin
//通过unsafe类的cas把sizeCtl更换为-1
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
//todo 暂时想不通为什么sc会大于0 ,多这个判断的意义是什么,。因为sc并不是volatile修饰。上边只有赋值,赋值完马上判断
//table大小默认值为16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//创建node数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//更新阈值 16 - 4 = 12. 是16 * 0.75
sc = n - (n >>> 2);
}
} finally {
//把sc赋值到sizeCtl上。
sizeCtl = sc;
}
break;
}
}
return tab;
}
helpTransfer
在map添加元素的时候,如果此时map正在扩容。此时不会阻塞,而会进入helpTransfer这个方法, 如果看过hashmap扩容源码,则知道扩容的时候会有数组的copy动作。此时这里会多线程进行拷贝。这个很NB。
/**
* Helps transfer if a resize is in progress.
*/
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
//先通过此时tab的大小计算一个校检码
int rs = resizeStamp(tab.length);
//用while监控是否扩容完毕。
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
//如果已经扩容完毕,或者不能在扩容了。则直接跳出
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
//帮助扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
transfer
这个扩容方法,我自己看了会,没有看太懂,我是从这篇文章中摘取的扩容方法的解析。
ConcurrentHashMap
/**
* 一个过渡的table表 只有在扩容的时候才会使用
*/
private transient volatile Node<K,V>[] nextTable;
/**
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//构造一个nextTable对象 它的容量是原来的两倍
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//构造一个连节点指针 用于标志位
boolean advance = true;//并发扩容的关键属性 如果等于true 说明这个节点已经处理过
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//这个while循环体的作用就是在控制i-- 通过i--可以依次遍历原hash表中的节点
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
//如果所有的节点都已经完成复制工作 就把nextTable赋值给table 清空临时对象nextTable
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);//扩容阈值设置为原来容量的1.5倍 依然相当于现在容量的0.75倍
return;
}
//利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
//如果遍历到的节点为空 则放入ForwardingNode指针
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//如果遍历到ForwardingNode节点 说明这个点已经被处理过了 直接跳过 这里是控制并发扩容的核心
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
//节点上锁
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
//如果fh>=0 证明这是一个Node节点
if (fh >= 0) {
int runBit = fh & n;
//以下的部分在完成的工作是构造两个链表 一个是原链表 另一个是原链表的反序排列
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//在nextTable的i位置上插入一个链表
setTabAt(nextTab, i, ln);
//在nextTable的i+n的位置上插入另一个链表
setTabAt(nextTab, i + n, hn);
//在table的i位置上插入forwardNode节点 表示已经处理过该节点
setTabAt(tab, i, fwd);
//设置advance为true 返回到上面的while循环中 就可以执行i--操作
advance = true;
}
//对TreeBin对象进行处理 与上面的过程类似
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
//构造正序和反序两个链表
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//如果扩容后已经不再需要tree的结构 反向转换为链表结构
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
//在nextTable的i位置上插入一个链表
setTabAt(nextTab, i, ln);
//在nextTable的i+n的位置上插入另一个链表
setTabAt(nextTab, i + n, hn);
//在table的i位置上插入forwardNode节点 表示已经处理过该节点
setTabAt(tab, i, fwd);
//设置advance为true 返回到上面的while循环中 就可以执行i--操作
advance = true;
}
}
}
}
}
}
putVal
接下来就可以看put方法了。 put方法会直接调用putVal方法,他比put方法多一个onlyIfAbsent参数。
该参数的意思为是否只在key为空的时候添加。(或者理解为是否覆盖value)
如果上边那些方法看明白了。HashMap的put方法看过,那么这个还是很简单的。
//添加方法。
final V putVal(K key, V value, boolean onlyIfAbsent) {
//concurrentHashMap不允许key或value为空
if (key == null || value == null) throw new NullPointerException();
//获取key的hash值
int hash = spread(key.hashCode());
int binCount = 0;
//开始循环,并且赋值tab数组。成功后才会跳出循环
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//首次添加会进入此判断。
if (tab == null || (n = tab.length) == 0)
//初始化node数组
//因为这是一个死循环,初始化后,会再次循环。
tab = initTable();
//通过key的hash值来寻找位置,如果当前位置为空,则进入判断
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//cas直接放入,如果放入成功。然后跳出循环。没有成功继续循环。
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//如果正在扩容。则帮助扩容
else if ((fh = f.hash) == MOVED)
//多线程扩容,返回新的tab
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//当该位置已经有值 则上锁。
synchronized (f) {
//判断是该值是否已经改变
if (tabAt(tab, i) == f) {
//如果f的hashcode大于0.则该节点是连边。
if (fh >= 0) {
binCount = 1;
//循环判断,如果hash值一样,并且equals也想等,则覆盖,如果e已经是最后一个元素,则链下去
//在这里每循环一次,就会++binCount.为下边是否变成树做准备
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//如果该节点是树。则用树的方式往下链
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//如果bincount大于8 则该位置变为红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//增加当前的容量,在增加的时候 也会根据容量来扩容。
addCount(1L, binCount);
return null;
}
get
这个方法没什么特殊的。跟hashmap的getnode方法差不多,就是找位置然后是树就从树中取,是链表就从链表中取
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//计算hash值
int h = spread(key.hashCode());
//如果当前tab没问题并且该位置有值
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//如果该位置的第一个就是该值、就直接返回
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//如果是树。则从树中寻找。
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//如果是链表,则next下去找。
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}