1.7与1.8区别
1.7采用分段锁的概念,如下图所示,每段包含多个节点,并且都是加悲观锁
1.8同样是采用分段锁的思想,只是这次将分段锁的粒度降低到节点级别,并且采用了部分CAS乐观锁的操作,大大提升了并发性能
数据结构沿用了与它同时期的HashMap版本的思想,底层依然由数组+链表+红黑树的方式思想。
有一个最重要的不同点就是ConcurrentHashMap不允许key或value为null值
重点变量
/**
* 盛装Node元素的数组 它的大小是2的整数次幂
* Size is always a power of two. Accessed directly by iterators.
*/
transient volatile Node<K,V>[] table;
/**
* Table initialization and resizing control. When negative, the
* table is being initialized or resized: -1 for initialization,
* else -(1 + the number of active resizing threads). Otherwise,
* when table is null, holds the initial table size to use upon
* creation, or 0 for default. After initialization, holds the
* next element count value upon which to resize the table.
hash表初始化或扩容时的一个控制位标识量。
负数代表正在进行初始化或扩容操作
-1代表正在初始化
-N 表示有N-1个线程正在进行扩容操作
正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小
*/
private transient volatile int sizeCtl;
Node
Node是ConcurrentHashMap最核心的内部类,每个Node可以理解为数组中的一个节点
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;//带有同步锁的value
volatile Node<K,V> next;//带有同步锁的next指针
get
因为Node的val值域是volatile的,所以无需加锁就可以得到节点的最新值
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//计算hash值
int h = spread(key.hashCode());
//根据hash值确定节点位置
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//如果搜索到的节点key与传入的key相同且不为null,直接返回这个节点
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//如果eh<0 说明这个节点在树上 直接寻找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//否则遍历链表 找到对应的值并返回
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
put
简洁说明:
- 根据给定的key的hash值找到其在table中的位置index
- 找到位置index后,根据以下情况进行存储或者帮助扩容后存储
- 如果当前正在扩容,则优先帮助扩容
- 如果table[index]位置没有元素,则直接通过CAS存储
- 如果table[i]存储的是一个链表:如果链表不存在key则直接加入到链表尾部;如果存在key则更新其对应的value;如果存入后链表元素>8,还需要将链表转换为红黑树
- 如果table[i]存储的是一个红黑树,则按照红黑树方式插入
其中3跟4需要synchronized对头节点进行加锁
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
//不允许 key或value为null
if (key == null || value == null) throw new NullPointerException();
//计算hash值
int hash = spread(key.hashCode());
int binCount = 0;
//死循环 何时插入成功 何时跳出
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//如果table为空的话,初始化table
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//根据hash值计算出在table里面的位置
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//如果这个位置没有值 ,直接放进去,不需要加锁
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//当遇到表连接点时,需要进行整合表的操作
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//结点上锁 这里的结点可以理解为hash值相同组成的链表的头结点
synchronized (f) {
if (tabAt(tab, i) == f) {
//fh〉0 说明这个节点是一个链表的节点 不是树的节点
if (fh >= 0) {
binCount = 1;
//在这里遍历链表所有的结点
for (Node<K,V> e = f;; ++binCount) {
K ek;
//如果hash值和key值相同 则修改对应结点的value值
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//如果遍历到了最后一个结点,那么就证明新的节点需要插入 就把它插入在链表尾部
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//如果这个节点是树节点,就按照树的方式插入值
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//如果链表长度已经达到临界值8 就需要把链表转换为树结构
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//将当前ConcurrentHashMap的元素数量+1
addCount(1L, binCount);
return null;
}
size
ConcurrentHashMap 中键值对的个数通过求 baseCount 与 counterCells 非空元素的和得到
/**
* Base counter value, used mainly when there is no contention,
* but also as a fallback during table initialization
* races. Updated via CAS.
* 当没有争用时,使用这个变量计数。
*/
private transient volatile long baseCount;
/**
* Table of counter cells. When non-null, size is a power of 2.
*/
private transient volatile CounterCell[] counterCells;
static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
一个 volatile 的变量,在 addCount 方法中会使用它,而 addCount 方法在 put 结束后会调用。在 addCount 方法中,会对这个变量做 CAS 加法
但是如果并发导致 CAS 失败了,使用 counterCells
如果使用 counterCells CAS 失败了,在 fullAddCount 方法中,会继续死循环操作,直到成功
这种方式目的是降低更新size时的冲突,提升性能
扩容
整个扩容操作分为两个部分
第一部分是构建一个nextTable,它的容量是原来的两倍,这个操作是单线程完成的。这个单线程的保证是通过RESIZE_STAMP_SHIFT这个常量经过一次运算来保证的
第二个部分就是将原来table中的元素复制到nextTable中,这里允许多线程进行操作。
先来看一下单线程是如何完成的:
它的大体思想就是遍历、复制的过程。首先根据运算得到需要遍历的次数i,然后利用tabAt方法获得i位置的元素:
如果这个位置为空,就在原table中的i位置放入forwardNode节点,这个也是触发并发扩容的关键点;
如果这个位置是Node节点(fh>=0),如果它是一个链表的头节点,就构造一个反序链表,把他们分别放在nextTable的i和i+n的位置上,然后在原table中的i位置放入forwardNode节点
如果这个位置是TreeBin节点(fh<0),也做一个反序处理,并且判断是否需要untreeify,把处理的结果分别放在nextTable的i和i+n的位置上,然后在原table中的i位置放入forwardNode节点
遍历过所有的节点以后就完成了复制工作,这时让nextTable作为新的table,并且更新sizeCtl为新容量的0.75倍 ,完成扩容。
再看一下多线程是如何完成的:
多线程遍历节点,处理了一个节点,就把对应点的值set为forward,另一个线程看到ForwardingNode节点,就向后遍历。
Key和Value不允许null值
ConcurrentHashmap和Hashtable都是支持并发的,这样会有一个问题,当你通过get(k)获取对应的value时,如果获取到的是null时,你无法判断,它是put(k,v)的时候value为null,还是这个key从来没有做过映射。
HashMap是非并发的,可以通过contains(key)来做这个判断。
而ConcurrentHashMap在调用m.containsKey(key)和m.get(key),这两个方法都是没有加锁的,调用时候m可能被其他线程改变了。
假如一个线程m.containsKey(k)为真,在还没执行m.get(k)的时候,k被另外一个线程给删除了,那么m.get(k)会返回null。如果允许null值的话,就会错误的判断为k还存在;因此不允许null值的话就可以正常的表示出当前的k是不存在的。所以在ConcurrentHashMap不应该有如下的写法,Key和Value不允许null值。
其实Value不允许null值就可以,Key为null似乎没什么影响,作者一起排除null我也不知道什么原因。
if (m.containsKey(k)) {
return m.get(k);
} else {
throw new KeyNotPresentException();
}
总结
- Node级别分段锁
- 读不加锁
- 写不一定需要加锁
- 可多线程扩容
- size计算方式特殊