简介
ConcurrentHashMap是java.util.concurrent
包下的重要成员,也是Java并发容器中使用较为频繁的。
ConcurrentHashMap不是采用和HashTable一样的方法,在访问操作的时候使用synchronized
保证只有一个线程能操作,而通过一种更细粒度的锁来实现锁住部分哈希表,从而提高并发性能。
但是在使用上ConcurrentHashMap是可以与HashTable互操作的,它遵循HashTable的规范,也就是说在Java5之后,随着ConcurrentHashMap的引入,想要提高性能的话可以把使用HashTable的地方替换成ConcurrentHashMap。可以说,ConcurrentHashMap的到来标志之HashTable的过时。
如果性能不是考虑的因素,只需要保证Map的线程安全的话,还可以通过Java的集合工具类获取一个经过包装的SynchronizedMap
,原理就是通过包装器模式(也叫装饰模式)把所有对原Map的操作进行一次synchronized
包装:
Map m = Collections.synchronizedMap(new HashMap())
理解ConcurrentHashMap的前提
由于涉及到并发编程,也就是说我们要理解ConcurrentHashMap的时候需要知道一些Java中提供的并发编程类库和知识体系,下面我主要大概介绍下ConcurrentHashMap需要提前知道的知识点,不做具体深入。
volatile
关键字
Java中使用volatile
关键字来保证变量的修改能够被其他线程知道,读和写一个volatile变量都有全局的排序(也就是不会被重排序,Java内存允许编译器和处理器对指令重排序),volatile变量不会被缓存在寄存器或者对其他处理器不可见的地方。
ReentrantLock
ReentrantLock是Java中用于并发编程的一个可重入的互斥锁,通常称为再入锁。Java5之后提供的锁实现,通过lock()
实现加锁,通过unlock()
实现锁释放。
相对于synchronized
同步锁,支持公平锁和非公平锁来实现一定程度的公平性,代码书写也相对灵活。
Unsafe
JDK中提供了类sun.misc.Unsafe
来支持任意内存地址位置的数据读写和一些CAS(compareAndSwap)原子操作,Unsafe不对用户开放,只对Java核心类库开放,通过判断调用者是否为Bootstrap类加载器加载的类来判断,可用通过反射获取该实例,但是建议在不得已的情况下不要使用Unsafe。
CAS
CAS(compare and swap,比较并交换)是实现原子操作的一种,可用于在多线程编程中实现不被打断的数据交换操作,从而避免多线程同时改写某一数据时由于执行顺序不确定性以及中断的不可预知性产生的数据不一致问题。 该操作通过将内存中的值与指定数据进行比较,当数值一样时将内存中的数据替换为新的值。
ConcurrentHashMap分析
ConcurrentHashMap随着Java的升级过程中也一直在升级演变,所以这里主要分析大概架构以及对比Java7和Java8中ConcurrentHashMap的变化。
ConcurrentHashMap结构
ConcurrentHashMap的大致结构如下,通过Segment对Map进行分段保存,当操作的时候只需要对对应的Segment进行锁定即可(Segment自身继承了ReentrantLock),不用像HashTable一样对整个链表锁定,进而大大提高性能。
ConcurrentHashMap In Java7
几个重要常量
// 如果没有指定Map大小,默认为16
static final int DEFAULT_INITIAL_CAPACITY = 16;
// 如果没有指定默认的加载因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;
// 默认的并发等级,也就是Segment数组的大小
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 该Map的最大容量,即2的30次方
static final int MAXIMUM_CAPACITY = 1 << 30;
// Segment中HashEntry数组的最小长度,必须是2的倍数,最小是2为了避免懒加载构造之后下次调用会立即扩容
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
// Segment数组的默认最长长度(2的16次方),必须是2的倍数,Segment最大是`1<<24`(2的24次方)
static final int MAX_SEGMENTS = 1 << 16; //比较保守
// 在非同步操作的情况下的重试次数(size,containsValue),避免在哈希表在连续修改的情况下出现无限重试
static final int RETRIES_BEFORE_LOCK = 2;
创建&初始化
- ConcurrentHashMap构造函数
// 三个参数的构造函数(初始化大小,加载因子,并发等级)
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel)
// 两个参数的构造函数(初始化大小,加载因子)
public ConcurrentHashMap(int initialCapacity, float loadFactor)
// 一个参数的构造函数(初始化大小)
public ConcurrentHashMap(int initialCapacity)
// 无参构造函数
public ConcurrentHashMap()
// 通过其他Map创建
public ConcurrentHashMap(Map<? extends K, ? extends V> m)
其中,需要关心的三个参数的构造函数,其他构造函数都是自定义+默认值(上述常量)最终调用三个参数的构造函数创建
- 初始化
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
// 检查
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 并发等级不能超过MAX_SEGMENTS
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// 根据concurrencyLevel计算Segment的长度,必须是2的倍数
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// 保存segmentShift和segmentMask用于索引Segment
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
// 初始大小不能超过MAXIMUM_CAPACITY
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// 计算每个Segment大小,初始化大小除以segment数组长度
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
// 计算每个Segment初始化HashEntry大小,最小为MIN_SEGMENT_TABLE_CAPACITY
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// 创建Segment数组,和第一个Segment
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
// 使用UNSAFE写入Segment数组
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
其中,为何在初始化期间创建第一个Segment是因为后面的Segment的创建都是以segments[0]作为模板创建的
get操作
get操作只是获取操作,这里没有任何同步代码或者锁,支持对数据的可见性
public V get(Object key) {
// 手动集成访问方法以减少开销
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
// 获取hash值
int h = hash(key);
// 通过位运算计算散列所在Segment
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// 通过UNSAFE获取对应的Segment,getObjectVolatile方法能保证其可见性
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
// 遍历获取到对应的值
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
// 找不到返回空
return null;
}
put操作
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
//和get操作一样定位到Segment
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
//通过Segment的put操作保存
return s.put(key, hash, value, false);
}
所以,重点的逻辑在Segment的put方法中:
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 无论如何获取锁,首先尝试获取锁,如果获取不到通过scanAndLockForPut获取锁
// scanAndLockForPut会通过while循环获取锁,直到获取到锁
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
//通过hash值定位到对应链表即HashEntry
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);//定位到具体HashEntry
for (HashEntry<K,V> e = first;;) {
if (e != null) {
//如果已经存在相同key,更新value
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
//如果不存在key则新增一个元素
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
//如果已经超过阈值则执行rehash进行扩容,然后保存
//注意,扩容只是在Segment中扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
size操作
ConcurrentHashMap中的size是一个比较有意思的操作,因为获取Map大小是一个比较常用的方法,如果不加锁获取可能会不准确因为同时可能有其他线程在操作map,如果加锁获取将会比较耗时,这里采用了通过指定重试次数(RETRIES_BEFORE_LOCK)(2次)
的方法来获取大小;具体就是在在重试次数内比较两次获取的size,如果一直则认为是准确值,如果不一致进行锁定然后再获取size
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
//如果达到重试次数,锁定
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
//计算size
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
//比较两次获取的结果是否一致
if (sum == last)
break;
last = sum;
}
} finally {
//如果锁定,则释放锁
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
ConcurrentHashMap在Java8新变化
ConcurrentHashMap在Java8中的已经发生比较大的变化,基本的结构和Java8的HashMap差不多,同样的当链表长度超过8的时候使用红黑树;使用volatile
保证可见性,Segment已经被取消,保留Segment只是为了反序列化的兼容,初始化改为懒加载,并发控制精确每个链表,使用synchronize
同步锁代替再入锁,使用CAS操作等。
数据结构
put操作
get操作就是类似HashMap的检索比较,这里重点关注put操作,看看在Java8中是如何实现并发操作的,其他操作暂不展开
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
//如果是第一次put,进行初始化(懒加载)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//如果链表为空,使用CAS进行无锁put
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
//如果处于扩容阶段,
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//这里在链表/树的第一个元素加锁
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
//如果是链表
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
//如果是红黑树
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
//如果是长度达到TREEIFY_THRESHOLD(默认是8),执行树化操作
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
懒加载初始化
这里可以看到使用CAS操作,实现table的初始化,并且使用双重检查(double-check)
保证重复初始化
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//进入循环检查
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
//如果正在初始化,等待
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
//进入初始化操作
try {
//再次检查
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
总结
可以看到,随之Java平台的升级,ConcurrentHashMap也是逐渐演变,但是都是为了更好的达到ConcurrentHashMap的设计目标:
- 在最小化实现更新操作的同时保持并发的可读性
- 是空间消耗到达和HashMap差不多或者更好
ConcurrentHashMap还有续续多多的实现细节,需要仔细阅读源码才能理解,并且涉及到许多底层操作的实现,能力有限,暂时能理解的就这么多,后续希望继续深入理解。