一、概述
从本篇文章开始,我将会对concurrent包下常用的类的使用以及源码学习进行一个记录,本篇文章主要是来了解下ConcurrentHashMap的源码。ConcurrentHashMap其实我们都不陌生了,是一种线程安全的Map集合,接下来我们就来看下的内部实现。
本文中的源码是基于JDK1.8的。
二、源码
1. 继承结构及构造方法
首先,我们来看下ConcurrentHashMap的继承机构:
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
首先,ConcurrentHashMap继承自AbstractMap,前面学习HashMap的时候曾简单说过,AbstractMap这个类提供了Map接口的一些简单实现,用来减少该接口实现的一些重复操作,以最小化实现该接口所需的工作量;而接口ConcurrentMap,根据文档描述,这个接口是用来提供线程安全性和原子性保证的映射;最后,实现了Serializable,表明该类支持序列化。
public ConcurrentHashMap()
public ConcurrentHashMap(int initialCapacity)
public ConcurrentHashMap(Map<? extends K, ? extends V> m)
public ConcurrentHashMap(int initialCapacity, float loadFactor)
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel)
构造方法的话,都比较常规,不多说了。
2. 常用属性
首先说下,JDK1.8之后,ConcurrentHashMap的内部存储结构其实和HashMap是一样的,都是通过数组 + 链表 + 红黑树来实现的。有一些属性和HashMap是相似的,我们这里就简单略过。
// 默认初始化容量
private static final int DEFAULT_CAPACITY = 16;
// 最大容量,2的30次方
private static final int MAXIMUM_CAPACITY = 1 << 30;
//最大数组容量
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//负载因子
private static final float LOAD_FACTOR = 0.75f;
//树形化的三个参数,不多说
static final int TREEIFY_THRESHOLD = 8;
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
然后再看下两个比较特殊的属性:
private transient volatile int sizeCtl;
该属性是用于控制数组大小和扩容的标识符,值不同有不同的含义:
- 如果是负数,表示数组正在初始化或者扩容;如果是-1,表示正在初始化,-N 表示有N-1个线程正在进行扩容操作;
- 如果是0或者是正数,表示table还没有被初始化,该值表示初始化或下此扩容的大小,类似于扩容阈值;它的值始终是当前ConcurrentHashMap容量的0.75倍,这与loadfactor是对应的。实际容量>=sizeCtl,则扩容。
另外的一个参数:
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
该参数是默认的并发级别,JDK1.8中已经不再使用,只是为了与以前的版本兼容才定义的。由于JDK1.8之前ConcurrentHashMap是通过分段锁来实现的,ConcurrentHashMap由多个Segment组成,而每个Segment都有对应的一把锁来实现线程安全。JDK1.8之后,Segment仍然存在,而该字段是服务于Segment的,所以,两者都是为了兼容原先的版本而存在的。
3. 相关节点类
3.1 Node类
保存table数组的元素,一个Node就是table中的一个元素。该类无法进行修改,只能进行只读遍历操作:
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
//构造方法,get方法
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}
/**
* Virtualized support for map.get(); overridden in subclasses.
*/
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}
Node类的valu及next属性都使用了volatile关键字修饰,表示在并发环境下的修改对其他线程都是可见的,并且Node类提供了一个find方法用于辅助get方法的调用。
3.2 TreeNode和TreeBin
/**
* Nodes for use in TreeBins
*/
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
}
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
}
TreeNode表示红黑树的具体实现,但正如该类的注释而言:Nodes for use in TreeBins,该类只是为TreeBin类服务,最终有TreeBin来操作红黑树。TreeBin用于封装维护TreeNode,包含了一系列对树的操作方法。
4. 实现
ConcurrentHashMap中使用了大量的CAS算法,总体上实现线程安全的方式是通过CAS + synchronized来实现的,至于CAS,可以参考该文章:CAS原理深度分析,接下来我们来分析下几个主要的方法,首先,还是来看下put方法。
4.1 put方法
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 1. key和value都不能为空
if (key == null || value == null) throw new NullPointerException();
// 2. 对key的hashCode再进行一次计算得到hash值
int hash = spread(key.hashCode());
int binCount = 0;
// 3. 无限循环操作,直到插入成功
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果表为空,或表的容量为0,初始化表
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 表不为空,且表中对应元素已经存在,比较并替换
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果该节点的hash值为MOVED,说明正在进行扩容操作,则当前线程会优先进行帮助扩容操作
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 其他情况下的操作
else {
V oldVal = null;
// 对hash桶进行加锁操作,
synchronized (f) {
// 再次检查表中下标为i的节点是否为同一个节点
if (tabAt(tab, i) == f) {
// 链表节点
if (fh >= 0) {
// 这里的bitCount = 1
binCount = 1;
// 又一个无限循环,这里bitCount自增,表示链表中的节点数量
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果节点的hash值相等,并且key也相等,那么先将旧值取出,然后进行替换操作
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 其他情况,如果当前节点的下一个节点为空,也就是最后一个节点
// 则将新节点在尾部插入
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 如果节点是树类型
else if (f instanceof TreeBin) {
Node<K,V> p;
// 这里的bitCount = 2
binCount = 2;
// 执行树的插入逻辑
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// bitCount判断
if (binCount != 0) {
// 大于树形化阈值,进行树形化操作
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
// 返回旧的值
if (oldVal != null)
return oldVal;
break;
}
}
}
// bitCount加1
addCount(1L, binCount);
return null;
}
方法有点长,我们来依次说下:
- 首先,ConcurrentHashMap的key和value都不能为空,这点和HashMap有点不同;
- 首先,计算key的hash值,然后进入循环;如果表为空,初始化表;如果不为空,通过key的hash在表中查找元素,如果找不到,说明该hash桶为空,通过CAS操作将值放入桶中;如果hash值为-1,也即是MOVED,则说明桶正在进行扩容,帮助进行扩容操作;
- 上面情况都不满足,则开始进行分段加锁,遍历操作。首先对桶中的第一个节点进行加锁,然后对链表进行遍历,根据hash值和key值进行比较,存在更新,不存在插入进去;如果该节点是红黑树类型,则进行红黑树的插入逻辑;
- 判断链表的长度是否达到树形化阈值,如果达到了,进行树形化;
在进行put操作的时候,会涉及到一些方法,我们来简单看下。先看下初始化方法initTable:
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 如果sizeCtl小于0,说明table正在进行初始化或扩容,则当前线程让出CPU资源
// 让其他线程执行
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// 比较SIZECTL与sc,相等设置为-1,不相等说明其他线程正在初始化或者已经初始化完成
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 再次判断数组是否初始化
if ((tab = table) == null || tab.length == 0) {
// 然后比较sc的值是否大于0,大于0则n为sc,否则,n为默认初始容量
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// sc = n * 3/4,注意这里无符号右移的计算
sc = n - (n >>> 2);
}
} finally {
// 最后设置sizeCtl的值
sizeCtl = sc;
}
break;
}
}
return tab;
}
从这里可以看到,table的值是与sizeCtl相关的。
然后来看下另一个方法:tabAt方法,该方法是一个CAS方法,为了方便CAS方法的调用,在ConcurrentHashMap中定义了Unsafe的一个实例:
private static final sun.misc.Unsafe U;
而该方法则是用于获取table中下标为i的节点:
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
另一个CAS方法casTabAt,则是一个比较交换的操作:
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
比较下标为i的节点是否是c,如果是,替换为v节点;
另外,还涉及到树形化方法treeifyBin,及扩容方法tryPresize,树形化方法就不说了,来看下扩容方法:
private final void tryPresize(int size) {
// 如果给定的容量大于了最大值的1/2,则直接扩为最大值,否则走tableSizeFor方法
// tableSizeFor方法是找到大于等于count的最小的2的幂次方
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
// sizeCtl大于等于0,扩容
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
// 同样先判断是否初始化
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
// 比较并设置为-1,表示正在进行初始化
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
// 如果要扩容的值小于等于原阀值,或现有容量>=最大值,什么都不用做了
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
int rs = resizeStamp(n);
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
4.2 get方法
接下来来看下get方法。
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 1. 计算key的hash
int h = spread(key.hashCode());
// 2. 对table是否为空判断,并判断key所对应的桶tab[i]是否存在
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 第一个节点
if ((eh = e.hash) == h) {
// hash相等,key相等,返回值
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// hash值小于0,去树中查找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 否则,去链表中查找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
get方法就相对简单些了,与HashMap的get方法差不多:
- 计算key的hash值;
- 对table进行为空判断,不为空,根据hash定位到table中的位置;
- 先检查tab[i]的头节点是否满足,不满足再去遍历树或者链表;
4.3 size方法
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
size方法用于计算Map中的容量,该方法从JDK1.2就有了,不过不建议使用了,因为推荐了新的方法:mappingCount方法。
public long mappingCount() {
long n = sumCount();
return (n < 0L) ? 0L : n; // ignore transient negative values
}
在mappingCount的注释中,我们可以看出:
- 该方法建议用来代替size()方法使用,因为ConcurrentHashMap可能会包含更多的映射结果,即超过int类型的最大值;
- 该方法返回的是一个估计值,由于存在并发的插入和删除,因此返回值可能与实际值会有出入;
看到源码会发现两者其实调用的都是sumCount方法,看下sumCount的源码:
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
最终结果值还是通过属性baseCount计算来的。而baseCount是基于CAS来更新的,所以实际值并不一定是当前Map的真实元素个数。
三、总结
1. 问题
(1)put操作的时候,为什么当hash值为MOVED的时候,桶正在进行扩容操作呢?
通过查看调用轨迹,我们可以看到是类型为ForwardingNode的节点,构造了一个hash值为MOVED的Node,我们来看下这个类:
/**
* A node inserted at head of bins during transfer operations.
*/
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
}
这个类是一个特殊的Node节点,该节点的hash值是-1,也就是MOVED,该类只会在扩容的时候用到,用于在扩容期间,插入到桶的头部的节点。也可以理解为一个占位符,表示当前节点正在被移动。
2. 总结
JDK1.8中的ConcurrentHashMap的数据结果和HashMap的是一样的,然后通过CAS和 synchronized 关键字来保证线程安全。本次内容只简单了解了ConcurrentHashMap的put和get操作,其他的操作其实步骤都差不太多,等有时间了,再去了解下。
本文参考自:
《Java源码分析》:ConcurrentHashMap JDK1.8
docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentHashMap.html
【JUC】JDK1.8源码分析之ConcurrentHashMap(一)
http://www.cnblogs.com/huaizuo/archive/2016/04/20/5413069.html