本文不是讨论ConcurrentHashMap的数据结构,而是讨论它的内存语义和并发操作。
ConcurrentHashMap的三个并发目标:
(写者指的是会修改数据结构(链表/红黑树)的线程)
- 任何时刻读者都不需要等待
- 如果在时刻A写者 完成了 修改,在时刻B读者 开始 读,而且时刻B晚于时刻A,那么读者将会看到这些修改(i.e. Happens-before) 。所以,我们也可以说:在时刻A写者 开始 修改,在时刻B读者 开始 读,而且时刻B晚于时刻A,那么读者可能看到这些修改,也可能看不到。
- 任何时刻不会有两个写者。
预备知识(本文假设你都已经了解这些知识):
- 第二个目标其实就是一个Happens-Before关系。Happens-Before关系是一个内存语义,用volatile变量就可以获得这种内存语义。
- 如果一个类含有final域,那么这个类的对象不可能是部分构造的。
- CAS操作
如何实现这三个并发目标:(分两种情况)
bin是链表:
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
...
对于第一个目标,get()函数中直接就是不加锁地遍历整个链表
public V get(Object key) {
...
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
...
}
对于第二个目标,可以分成三种情况:
- 修改一个已存在的结点的val域
由于val是一个volatile变量,第二个目标已达成。 - 在链表的末尾插入一个结点。比如现在需要将B插入到end的后面,那么:
next域也是一个volatile变量,而且B一定是一个完全构造好的对象,第二个目标也达成。Node<K, V> B = new Node<>(.., .., .., null); end.next = B;
- 删除一个已存在的结点,比如有:...->A->B->...,现在要删除B,那么:
和插入一样,next域也是一个volatile变量,第二个目标也达成。A.next = B.next;
对于第三个目标,每次写者都需要获得bin锁(以bin的第一个结点作为这个bin的锁)才能进行修改操作。还有一个细节,如果这是一个空bin(i.e. bin的第一个结点为null),那么直接用CAS操作将新创建出的结点赋给bin的第一个结点。
final V putVal(K key, V value, boolean onlyIfAbsent) {
...
f是一个Node<K,V>类型的对象,它现在指向是bin的第一个结点
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
...
else { 进入这里说明键key不存在,需要插入一个结点
synchronized (f) { 加锁
if (tabAt(tab, i) == f) { 确认f仍然是bin的第一个结点
if(f是链表结点)
...链表的插入操作
else if (f instanceof TreeBin)
...红黑树的插入操作(在后面分析,在这里只需要记住,即使是树插入,也要先获得bin锁
}//if
}//synchronized
}//else
...
bin是红黑树(重点):
有必要先知道ConcurrentHashMap是怎样表示红黑树的:
如果一个bin是红黑树结构,那么这个bin的第一个结点是TreeBin类型的(正如上面的代码所示)。TreeBin中不存放key-value对,root指向真正红黑树的根,而它的first指向链表的第一个结点。在这里要注意:即使树化后,ConcurrentHashMap仍然维持链表结构,因为next域是在父类Node<K,V>中。所以,可以这样说:如果这个bin树化了,那么既可以用红黑树来遍历它的结点,也可以用链表来遍历它的结点。还有一个细节就是,TreeNode<K,V>中有一个prev,用来(在链表结构中)保存前一个结点,在用红黑树的删除算法时会用到。
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
....
}
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
...
}
回到并发,先谈总体策略:(下面都假设bin已树化)
- 与链表结构的bin不同,如果一个写者正在修改红黑树结构,那么读者不能用红黑树来读。——因为这样没有保证第二个目标。Doug Leg给出了以下注释:
* TreeBins also require an additional locking mechanism. While
* list traversal is always possible by readers even during
* updates, tree traversal is not, mainly because of tree-rotations
* that may change the root node and/or its linkages.
再结合Java对java.util.concurrent.collection的说明:
The methods of all classes in java.util.concurrent and its subpackages extend these guarantees to higher-levelsynchronization. In particular: Actions in a thread prior to placing an object into any concurrent collection happen-before actions subsequent to the access or removal of that element from the collection in another thread.
红黑树的插入/删除是有可能旋转树的,如果写者和读者能并发执行,有可能发生这样一种情况:假设结点N已存在于红黑树中。写者插入M结点,读者读N结点。因为写者旋转了树,导致读者读不到N。这种情境下placing an object(N) into collection Happens-Before access that element(N)的关系不成立。所以,读者和写者一定不能并发执行。
- 如果一个写者正在做修改,读者就用链表结构去遍历(这会慢一点,而且没有利用红黑树)。这就对应了第一个并发目标。
- 现在,其实就是一个读者-写者问题。
3.1 同一时刻允许多个读者进行读。
3.2 只要有读者在读,写者就不能进入,此时写者就变成了等待者,它在等待最后一个读者离开。
3.3 最后一个离开的读者必须唤醒等待者(如果有的话)
3.4 只有一个写者(Recall: bin锁)
其实这个问题可以用信号量非常优雅地解决,作为热身,先看一下CS:APP给出的解法。
/*
* Readers-writers solution with weak reader priority
* From Courtois et al, CACM, 1971.
*/
#include "csapp.h"
int readcnt; /* Initially = 0 */
sem_t mutex, w; /* Both initially = 1 */
void reader(void)
{
while (1) {
P(&mutex);
readcnt++;
if (readcnt == 1) /* First in */
P(&w);
V(&mutex);
/* Critical section */
/* Reading happens */
P(&mutex);
readcnt--;
if (readcnt == 0) /* Last out */
V(&w);
V(&mutex);
}
}
void writer(void)
{
while (1) {
P(&w);
/* Critical section */
/* Writing happens */
V(&w);
}
}
mutex可以理解为访问readcnt的锁,每次访问readcnt之前都要去获取这个锁。w作为写锁,写者只有获得这个锁才可以进入。这里的思想是:第一个进入的读者首先去获取写锁,这样写者就无办法进入了(于是,它变为等待者)。最后一个离开的读者必须释放写锁(然后等待者就可以进入了)。
为什么要用两个锁呢?只用一个锁会怎样?(这部分可能有点啰嗦了)
/*
* Naive readers-writers solution
*/
#include "csapp.h"
/* Global variable */
sem_t w; /* Initially = 1 */
void reader(void)
{
while (1) {
P(&w);
/* Critical section: */
/* Reading happens */
V(&w);
}
}
void writer(void)
{
while (1) {
P(&w);
/* Critical section: */
/* Writing happens */
V(&w);
}
}
上面这样的写法,虽然也可以保证读者和写者不会并发执行,但也使得多个读者不能并发执行。OK,下面来看ConcurrentHashMap的解法...
下面的代码有点复杂,用了位操作,留意注释,涉及到TreeBin中的:
volatile Thread waiter;
volatile int lockState;
// values for lockState
static final int WRITER = 1; //set while holding write lock
static final int WAITER = 2; //set when waiting for write lock
static final int READER = 4; //increment value for setting read lock
分两个角度 (以下:‘我’指当前线程)
读者
- 如果已有等待者进入,或已有写者进入,那么我无法进入红黑树,只能按链表读 ——for循环中第一个if
- 如果既没有等待者,又没有写者,那么尝试将读者的数量加一,并进入——for循环中的else if
- 读完后,我尝试离开。如果尝试离开成功,而且发现自己是最后一个读者,而且有等待者在等待,那么将等待者唤醒。——for循环中的最后一个if
final Node<K,V> find(int h, Object k) {
if (k != null) {
first是链表的第一个结点,而且它是'volatile'修饰的
当走到链表末尾时,退出循环
for (Node<K,V> e = first; e != null; ) {
int s; K ek;
if (((s = lockState) & (WAITER|WRITER)) != 0) {
if (e是我们要找的结点吗)
return e;
e = e.next; 链表遍历操作
}
else if (U.compareAndSwapInt(this, LOCKSTATE, s,
s + READER)) {
TreeNode<K,V> r, p;
try {
p = ((r = root) == null ? null :
r.findTreeNode(h, k, null)); 红黑树读
} finally {
Thread w;
if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
(READER|WAITER) && (w = waiter) != null)
LockSupport.unpark(w); 使等待者可以被调度
}
return p;
}
}
}
return null;
}
上面的全部是CAS操作,所以我用“尝试”这个字眼,如果尝试失败,那么下一次循环再试。而waiter, lockState是volatile变量,它们一定是最新的。
写者
- 所有的修改方法(removeTreeNode(), putTreeVal())都调用lockRoot()方法
- lockRoot()中首先判断有无读者?有无等待者?有无写者?如果都无,我进入,成为写者。如果其中有任意一个,那么调用contendedLock()——竞争锁
- 如果有一个等待者,或没有读者,那么我就进入成为写者。——for循环中的第一个if
3.1关键:任何时刻,如果有等待者,就不可能有写者
3.2 任何时刻,如果有读者,就不可能有等待者,也不可能有写者
3.2 如果在这一个判断中发现已存在一个等待者,其实就是当前线程自身。因为它之前尝试进入失败,所以成为了等待者。现在没有读者,它也就能成为写者了。 - 如果尝试成为写者失败,又没有等待者,那么我就要成为等待者。——for循环中的else if
/**
* Acquires write lock for tree restructuring.
*/
private final void lockRoot() {
if (!U.compareAndSwapInt(this, LOCKSTATE, 0, WRITER))
contendedLock(); // offload to separate method
}
/**
* Possibly blocks awaiting root lock.
*/
private final void contendedLock() {
boolean waiting = false;
for (int s;;) {
if (((s = lockState) & ~WAITER) == 0) {
if (U.compareAndSwapInt(this, LOCKSTATE, s, WRITER)) {
if (waiting)
waiter = null;
return;
}
}
else if ((s & WAITER) == 0) {
if (U.compareAndSwapInt(this, LOCKSTATE, s, s | WAITER)) {
waiting = true;
waiter = Thread.currentThread();
}
}
else if (waiting)
LockSupport.park(this); 禁止等待者被系统调度,等待者需要被最后一个离开的读者显式地唤醒
}
}
回顾和对比
ConcurrentHashMap中的算法和CS:APP的思想都是一样的,只不过用了非阻塞算法(CAS),使得它复杂很多。比如,一个线程想尝试获取某个共享变量,如果用阻塞算法(同步 / 信号量),那么就是尝试获取,如果失败,则阻塞(被挂起)。如果使用的CAS,那么如果尝试失败,下一次再尝试。
所以使用CAS,会有多个状态。而用同步,状态就比较少。例如,在ConcurrentHashMap中,有可能出现这样一个状态:lockState == WAITER,这说明没有读者,没有写者,只有一个等待者(PS:这个状态在contendedLock()的for循环的第一个if中被处理)。你可能会对这个状态感到疑惑,既然没有读者,那么为什么这个线程不成为写者,却成为等待者呢?其实,只要考虑一下什么时候会出现这个状态,就很容易理解了。
1. 当一个想修改红黑树数据结构的线程尝试进入时,发现已经存在读者。它尝试进入失败,于是,它尝试成为等待者,尝试成功,它真的成为等待者了。2. 之后,过了一段时间,最后一个读者尝试离开,如果尝试离开成功了,那么保证会唤起等待者。3. 等待者醒了,此时在等待者眼中,就会出现上述状态
现在在看下CS:APP的解法,发现是真的简单啊。。。
补充:
CAS一般都会结合volatile变量一起使用,CAS实现原子性,而volatile实现内存可见性(i.e.happens-before)。参卡AtomicInteger,AtomicReference等
附加
resize
* The table is resized when occupancy exceeds a percentage
* threshold (nominally, 0.75, but see below). Any thread
* noticing an overfull bin may assist in resizing after the
* initiating thread allocates and sets up the replacement array.
* However, rather than stalling, these other threads may proceed
* with insertions etc. The use of TreeBins shields us from the
* worst case effects of overfilling while resizes are in
* progress. Resizing proceeds by transferring bins, one by one,
* from the table to the next table. However, threads claim small
* blocks of indices to transfer (via field transferIndex) before
* doing so, reducing contention. A generation stamp in field
* sizeCtl ensures that resizings do not overlap. Because we are
* using power-of-two expansion, the elements from each bin must
* either stay at same index, or move with a power of two
* offset. We eliminate unnecessary node creation by catching
* cases where old nodes can be reused because their next fields
* won't change. On average, only about one-sixth of them need
* cloning when a table doubles. The nodes they replace will be
* garbage collectable as soon as they are no longer referenced by
* any reader thread that may be in the midst of concurrently
* traversing table. Upon transfer, the old table bin contains
* only a special forwarding node (with hash field "MOVED") that
* contains the next table as its key. On encountering a
* forwarding node, access and update operations restart, using
* the new table.
resize时的并发
resize是通过transfer完成的,在resize时,仍然能达成三个并发目标。读操作(get)线程不会helpTransfer,但是所有写操作线程(put remove)都会helpTransfer。resize时,不会改动原table,所以读线程继续读原table是安全的。而且,因为此时任何写线程都在helpTransfer,它们的写操作都没有完成,所以读线程即使读不到它们的修改,也是允许的(符合happens-before)。
数据结构如何变化
table是Bin数组,每一个Bin装Node,每个Node装一个<key, value>
invariants:
key.hashCode() --> compute hash --> hash % table.length作为table数组的下标。所以,resize时,要重新决定每一个结点要放到哪一个Bin中。
tricks to speed up
Resizing proceeds by transferring bins, one by one,
from the table to the next table. However, threads claim small
blocks of indices to transfer (via field transferIndex) before
doing so, reducing contention. A generation stamp in field
sizeCtl ensures that resizings do not overlap
Because we are using power-of-two expansion, the elements from each bin must either stay at same index, or move with a power of two offset.
We eliminate unnecessary node creation by catching cases where old nodes can be reused because their next fields won't change.