Lock Stripping
在 Java 中,普通的 HashMap 是非线程安全的,HashTable 是线程安全的 Map。从下面的代码可以看出来,它的线程安全是通过在 get 和 put 方法上加 synchronized 实现的,锁的粒度是整个对象,两个线程都不能同时 get,性能很差。
public class Hashtable<K,V>
extends Dictionary<K,V>
implements Map<K,V>, Cloneable, java.io.Serializable {
public synchronized V get(Object key) {
// 省略具体内容,只看方法定义
}
public synchronized V put(K key, V value) {
// 省略具体内容,只看方法定义
}
}
为了改善并发操作的性能,Java 提供了 ConcurrentHashMap,它使用了锁分离技术,将原先 HashTable 中对象级的大锁,换成了多个小粒度的 Segment 锁,每一个 Segment 都相当于一个 HashTable。
static final class Segment<K,V> extends ReentrantLock implements Serializable {
transient volatile HashEntry<K,V>[] table;
}
Unsafe
Java 不能直接访问操作系统底层,而是通过本地方法来访问,Unsafe 类提供了一些硬件级别的原子操作。通常,使用 Unsafe 是为了提升性能。
private static final sun.misc.Unsafe UNSAFE;
private static final long SBASE;
private static final int SSHIFT;
static {
int ss, ts;
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class sc = Segment[].class;
SBASE = UNSAFE.arrayBaseOffset(sc);
ss = UNSAFE.arrayIndexScale(sc);
} catch (Exception e) {
throw new Error(e);
}
SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);
}
static final <K,V> Segment<K,V> segmentAt(Segment<K,V>[] ss, int j) {
long u = (j << SSHIFT) + SBASE;
return ss == null ? null :
(Segment<K,V>) UNSAFE.getObjectVolatile(ss, u);
}
在 ConcurrentHashMap 中,segmentAt 方法用于获取 Segment 数组中第 j 个元素的引用,其中的一些常量用到了 Unsafe 的 arrayBaseOffset、arrayIndexScale、getObjectVolatile 方法,其功能如下表所示:
方法 | 功能 |
---|---|
arrayBaseOffset | 获取数组第一个元素的偏移地址 |
arrayIndexScale | 获取数组的转换因子,也就是数组中元素的增量地址 |
getObjectVolatile | 获取 obj 对象中 offset 偏移地址对应的 object 型 field 的值,支持 volatile load 语义 |
Unsafe 的特性大致可以分为以下几类:
- 对变量和数组内容的原子访问,自定义内存屏障
- 对序列化的支持
- 自定义内存管理/高效的内存布局
- 与原生代码和其他JVM进行互操作
- 对高级锁的支持
更多 Unsafe 方法的功能请看:Java 中的 Unsafe 类详解
自旋
重量级的锁往往引起线程切换,而线程切换非常耗时。在竞争不激烈,锁占用时间较短(比线程切换耗时短)的环境里,通过 CPU 轮询来获取锁,往往效率更高。
在 Segment 的 remove 方法中,修改数据时需要获取锁,这时候并没有直接调用会阻塞的 lock 方法,而是调用了 tryLock,获取锁失败时进入了 scanAndLock 方法。scanAndLock 方法先是通过自旋来尝试获得锁,当自旋次数超过阈值 MAX_SCAN_RETRIES 时,调用 lock 方法来获得锁。
final V remove(Object key, int hash, Object value) {
if (!tryLock())
scanAndLock(key, hash);
// 省略其他
}
private void scanAndLock(Object key, int hash) {
// similar to but simpler than scanAndLockForPut
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
int retries = -1;
while (!tryLock()) {
HashEntry<K,V> f;
if (retries < 0) {
if (e == null || key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f;
retries = -1;
}
}
}
Sequential Consistency
在 scanAndLock 的文档说明里,提到了 "we must lock even if the key is not found, to ensure sequential consistency of updates",即使 key 不存在,也要加锁,保证更新的顺序一致性。
/**
* Scans for a node containing the given key while trying to
* acquire lock for a remove or replace operation. Upon
* return, guarantees that lock is held. Note that we must
* lock even if the key is not found, to ensure sequential
* consistency of updates.
*/
private void scanAndLock(Object key, int hash) {
Sequential Consistency 即顺序一致性,是一种内存一致性模型,它的定义是:
(并发程序在多处理器上的)任何一次执行结果都相同,就像所有处理器的操作按照某个顺序执行,各个微处理器的操作按照其程序指定的顺序进行。换句话说,所有的处理器以相同的顺序看到所有的修改。读操作未必能及时得到此前其他处理器对同一数据的写更新。但是各处理器读到的该数据的不同值的顺序是一致的。
现在的 CPU 和编译器会对代码做各种各样对优化,有时候为了改进性能而把代码执行顺序打乱,这样可能会导致错误对程序执行结果。为了保证顺序一致性,需要使用 Memory Barrier,一般同步原语(例如锁)会隐式地调用 Memory Barrier 指令。
Trade Offs
Trade Offs 即权衡,鱼与熊掌不可兼得。在一些系统性能优化中,经常使用空间换时间,或时间换空间。在 ConcurrentHashMap 中,也有一些权衡。
读与写
请看如下 Segment 类的 get 方法,其中并没有使用锁,显然效率会很高。那么 ConcurrentHashMap 是如何保证在 put 和 rehash 的过程中,读仍然能正常工作呢?
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
请看 put 方法的第 32 行,每一次 put 都把新的元素放到 HashEntry 链表的头结点,这样就不会破坏链表的结构,读操作就不会失败。
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
对于 rehash,请看下面的代码,在复制数据时,并没有去改变原先的 HashEntry 的 next 字段。在新的链表中,是重新 new 了 HashEntry 节点来装载数据。这样一来,旧的链表结构没有被破坏,读操作也就不会失败了。
/**
* Doubles size of table and repacks entries, also adding the
* given node to new table
*/
@SuppressWarnings("unchecked")
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
如果 rehash 时,所有 HashEntry 都 new 一个,会生成大量新对象,耗费过多内存。所以上面代码第 24~32 行,做了一些优化,重用了部分旧链表的节点,即代码注释里所写的 “Reuse consecutive sequence at same slot”。重用的关键是,找到旧链表中的一个关键节点 lastRun,该关键节点确保:从 lastRun 到旧链表的尾节点的所有数据,在新链表中的哈希位置一样。
综上所述,ConcurrentHashMap 通过巧妙的设计 put 和 rehash,多消耗了一些内存,换来了不需要加锁的高效的读操作。
一致性与效率
ConcurrentHashMap 的 clear 方法如下所示,其并没有一个全局的锁,这会导致清理完一个 Segment 后,正在清理下一个 Segment 时,前面的 Segment 有可能已经被其他线程放入了数据,这样就会导致 clear 方法结束的时候,ConcurrentHashMap 中可能存在数据。
public void clear() {
final Segment<K,V>[] segments = this.segments;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> s = segmentAt(segments, j);
if (s != null)
s.clear();
}
}
也就是说,相对于 HashTable 的强一致,ConcurrentHashMap 是弱一致的。那么弱一致性带来了什么好处呢?因为没有全局锁,从而可以提供更好的读写并发能力。