一、背景
在JDK1.5
版本中引入了一个线程安全高性能Map类—ConcurrentHashMap
,主要为了解决HashMap
线程不安全和Hashtable
效率不高的问题。众所周知,HashMap
在多线程编程中是线程不安全的,而Hashtable
由于使用了synchronized
修饰方法而导致执行效率不高;因此,ConcurrentHashMap
的出现是为了在保证线程安全的前提下,提高性能。
二、设计猜想
在探究源码之前,首先我们先尝试用自己的逻辑来设计下ConcurrentHashMap<K,V>
的数据结构。在我们熟悉的数据结构中有 一种hash
表的数据结构符合ConcurrentHashMap
的存储要求
[散列表](Hash table,也叫哈希表),是根据关键码值(Key value)而直接进行访问的数据结构。也就是 说,它通过把关键码值映射到表中一个位置来访问记录,以加快查找的速度。这个映射函数叫做散列函数, 存放记录的数组叫做散列表。
在设计ConcurrentHashMap
中需要关注的几个方面:
- 由于数组大小、类型一旦确定,编译、运行时都不可修改,必须对
ConcurrentHashMap
的数组动态扩容。 - 通过Key计算得到的
Hash
值,必须保证元素都能正确的被保存,需要考虑hash
冲突的处理。
三、设计思路
Hashtable
效率不高的主要原因是容器操作过程中对整个对象进行加锁,其他线程的任何操作都会被阻塞。解决方式就是从“减少锁的范围”、“引入非阻塞锁”或引入”读写锁“等方面进行优化,针对以上的思路可以进行如下改进:
- 将
Hashtable
整张表拆分成一张张的小表,操作中只有某个小表受影响,而其他小表可以实现高性能操作。 - 小表可以实现非阻塞锁的”读写“锁,在操作小表时,可以减少其他线程的等待时间。
我们暂称以上的设计为“分段锁”技术。
Map的底层数据结构是数组+链表,基于上面的拆表思路,将上面的小表替换成链表的头节点即数组的某个元素。基于以上的方案,可以降低方案一的复杂度和空间使用率。
JDK中提供的ConcurrentHashMap
在1.7和1.8的版本中正是基于这个改进思路进行设计的。
四、分段锁技术
ConcurrentHashMap
通过持有一个Segment[]
数组对象来实现锁分段技术,其中每一个Segment
对象通过继承ReentrantLock
实现持有非阻塞的重入锁。实则一个Segment
对象可以理解为一个HashMap
,不同在于Segment
持有锁,而HashMap
不持有锁。Segment
中持有一个HashEntry[]
数组,HashEntry
是数据节点对象。
ConcurrentHashMap
结构如下图:
4.1 名词术语
字段 | 说明 | 备注 |
---|---|---|
concurrencyLevel | 并发级别 | |
segmentShift | 段偏移量 | |
segmentMask | 段掩码 |
4.2 源码剖析
4.2.1 容器初始化
容器初始化主要是为完成Segment[]
数组对象的初始化,Segment[]
数组的大小又依赖于ssize
的值,ssize
是通过concurrencyLevel
计算得到,其中ssize应该满足2的N次方(power-of-two sizes),所以ssize >=concurrencyLevel,此处对于Segment[]数组的大小的约束是为了通过按位与的散列算法来定位Segments
数组的索引。(以下省略无关代码)
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
int ssize = 1;
while (ssize < concurrencyLevel)
ssize <<= 1;
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
}
4.2.2 设置值
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
-
ConcurrentHashMap
的value不能为空,否则抛出NPE异常。 - 获取key相应的
hash
值,用于计算操作元素所在的segment[]
数组的索引下标j。 - 通过索引拿到相应的
segment
对象,进行put
操作
计算Key的hash值
private int hash(Object k) {
int h = hashSeed;
if ((0 != h) && (k instanceof String))
return sun.misc.Hashing.stringHash32((String) k);
h ^= k.hashCode();
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
ConcurrentHashMap
中对key的处理采用的是变体的Wang/Jenkins
哈希算法。此算法特点:雪崩性和可逆性。
计算Segment数组下标
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
int sshift = 0,ssize = 1;
while (ssize < concurrencyLevel){
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
}
public V put(K key, V value) {
int j = (hash >>> segmentShift) & segmentMask;
}
j指待操作的Segment[]
数组的下标,而其中的segmentShift
和segmentMask
是容器初始化时完成的赋值。其中的ssize
等于Segment[]
数组的长度,而sshift
满足2^sshif=ssize
公式。下标j的值域范围为[0,ssize-1]
,由于ssize=2^sshif
,那么下标j可以用1个sshift
位的二进制数字表示。假如:ssize为16,那么sshift=4,j的值域为[0,15],而[0000b,1111b]
就是j的值域;则求key在Segment[]的下标j,就是求key对应的一个散列的4位二进制数值。而ConcurrentHashMap的源码求下标j的方式非常简单,就是取key的hash值的高4位。因此,求key散列到长度为ssize的Segment数组的下标j,就是求key的hash值的高sshift位。
初始化Segment对象
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
- 利用
UNSAFE
原子操作从Segment[]
数组中尝试获取对应Segment
对象,没有则进行初始化,有则直接返回。 -
Segment
对象的初始化以Segment[]
数组中头节点的参数和UNSAFE
原子操作完成初始化。
存放元素
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//尝试获取Segment的Lock,获取成功返回null,否则非阻塞重试3次,超过3次则阻塞等待锁,返回对应的链表节点。
HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
//递归从链表头切换到后续节点
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
- 首先尝试获取
Segment
的Lock,Segment
持有的是ReentrantLock
非阻塞锁,如果失败则重试获取 - 获取链表的头结点,
Segment
持有的HashEntry[]
数组操作的是头结点开始:- 如果头节点e不为空,如果key相同,则更新值,否则执行
e = e.next
进行链表遍历 - 如果头节点为空,则新增节点node,满足
node.next=first
。
- 如果头节点e不为空,如果key相同,则更新值,否则执行
4.2.3 容器扩容
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
4.2.4 获取值
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s=(Segment<K,V>)UNSAFE.getObjectVolatile(segments,u))!=null&&(tab = s.table)!=null) {
for (HashEntry<K,V> e=(HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
- 获取key相应的
hash
值,通过计算得到索引u
,从而定位到Segment[]
数组中的Segment
对象。 - 在通过计算得到
HashEntry[]
数组中的HashEntry
对象,再通过循环遍历链表,最终定位到数据。
五、行锁技术
5.1 源码剖析
5.2.1 容器初始化
static final float DEFAULT_LOAD_FACTOR = 0.75f;
static final int DEFAULT_INITIAL_CAPACITY = 1 << 4;
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
……………………
this.loadFactor = loadFactor;
this.threshold = tableSizeFor(initialCapacity);
}
在进行初始化容器时,会设置两个参数,initialCapacity
表示容器数组的容量,loadFactor
表示容器的负载因子。其中tableSizeFor
会对initialCapacity
参数值进行二进制的位左移,使得容量是2的N次方。
5.2.2 设置值
计算Key的hash值
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
其中 hash(key) 方法就是计算key的hash值, (h = key.hashCode()) ^ (h >>> 16) 使用高位16位和低16位异或运算得到Hash值,主要为了使hash分布尽可能的均匀。
[图片上传失败...(image-237caa-1576651270043)]
初始化容器数组
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
- 通过sc变量来控制初始化过程,
sc=-1
在进行容器初始化,其他线程循环等待并让出CPU(Thread.yield()
) - 初始化
Node[]
数组对象,并返回
存放元素
final V putVal(K key, V value, boolean onlyIfAbsent) {
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
- 获取容器中
Node[]
数组的头结点,如果头结点为空,则构造新节点,并添加到该头结点上(CAS)。 - 如果不是头节点,则使用
synchronized
锁住头节点,循环进行链表尾部添加
5.2.3 容器扩容
https://www.cnblogs.com/menghuantiancheng/p/10462370.html
5.2.4 获取值
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) >0&&(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
} else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
- 获取key相应的
hash
值,定位到Node[]数组的索引位,依次循环遍历链表,最终定位到数据。
参考文档:
https://www.cnblogs.com/weiweiblog/archive/2018/09/09/9611271.html