Java 容器类 - Map
sschrodinger
2019/03/24
参考
《算法》第四版 - Robert.S 著,谢路云 译
AbstractMap
EntrySet
类似的,AbstractMap
实现了 Map
的基础框架,在这个框架中,最重要的类是 EntrySet
结构。Map
实现不支持迭代器,EntrySet
将键值对包装成一个 Set,这样就可以对 Set 做迭代,实现对 Map 的遍历。
举例来说,containsValue(Object value)
函数在内部使用 entrySet 的迭代器遍历整个 Map,如果找到就返回 true,否则返回 false。
public boolean containsValue(Object value) {
Iterator<Entry<K,V>> i = entrySet().iterator();
if (value==null) {
while (i.hasNext()) {
Entry<K,V> e = i.next();
if (e.getValue()==null)
return true;
}
} else {
while (i.hasNext()) {
Entry<K,V> e = i.next();
if (value.equals(e.getValue()))
return true;
}
}
return false;
}
键视图和值视图
和 List 的 subList 类似,Map 支持返回键视图和值视图。因为 Map 的值要求唯一,而 value 不做要求,所以 Map 的键视图采用 Set 存储,值视图采用 Collection 存储,两个视图如下所示:
transient Set<K> keySet;
transient Collection<V> values;
懒加载模式
所谓的懒加载模式,即在创建类的时候不创建视图,而是在使用视图的时候,如调用 keySet()
函数时才创建视图。
如下,当 keySet 不为空时,直接返回 keySet;如果为空,才创建 keySet。
public Set<K> keySet() {
Set<K> ks = keySet;
if (ks == null) {
ks = new AbstractSet<K>() {
//...
};
keySet = ks;
}
return ks;
}
受限制的视图
和 AbstractList 的 subList 所不同的是,AbstractMap 提供的键视图和值视图是受限制的视图。我们可以在 subList 中删除元素,增加元素,取决于他的原 List 可以支持哪些操作,但是 AbstractMap 的键视图和值视图只支持部分操作。
KeySet 的实现重写了 AbstractSet,但是在 AbstractSet 中,所有的操作都被设置成了抛出 OperationNotSupportException,所以只有其重写的函数和该 Map 的实现所绑定。
下面是 AbstractMap 的 keySet 实现。
new AbstractSet<K>() {
public Iterator<K> iterator() {
return new Iterator<K>() {
private Iterator<Entry<K,V>> i = entrySet().iterator();
public boolean hasNext() {
return i.hasNext();
}
public K next() {
return i.next().getKey();
}
public void remove() {
i.remove();
}
};
}
public int size() {
return AbstractMap.this.size();
}
public boolean isEmpty() {
return AbstractMap.this.isEmpty();
}
public void clear() {
AbstractMap.this.clear();
}
public boolean contains(Object k) {
return AbstractMap.this.containsKey(k);
}
};
HashMap
特性
HashMap
是平常用的最多的一个 Map 实现,我们列出其几大特性:
特性
- 基于 Hash table(哈希表)的实现
- 提供所有 map 接口,允许 null 的键和 null 的值
- 非线程安全
- 不保证读取键值对的顺序,不仅仅包括键值对 put 和 get 顺序不同,也包括随时间变化,迭代的顺序也可能不同。
- 对于基本的操作(get 和 put),可以在常数时间上完成。
HashMap 实现原理
散列表 的作用相当于索引,通过散列表可以快速的定位元素的位置。散列表依据如何解决冲突可以划分为多种散列表,在 HashMap 的实现中,采用单独链表法解决冲突。
首先看 Hash table 的原理,简要的概括, hash table 的思想就是分区。
一个简化版的 hash map 如下图所示:
对于需要保存的每一个 entry,求出其 hash 值,并与 hash table 的大小做取模运算(保证每一个 hash 值都可以映射到 hash table 中),然后,将取模运算之后的 hash 值对应的 entry 连接到 hash table 对应的项中,如果已经有元素,则将其连接到已有元素的后面。
hashMap
采用 Node
类作为其 entry 节点,Node
类如下所示:
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
V value;
Node<K,V> next;
Node(int hash, K key, V value, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
public final K getKey() { return key; }
public final V getValue() { return value; }
public final String toString() { return key + "=" + value; }
public final int hashCode() {
return Objects.hashCode(key) ^ Objects.hashCode(value);
}
public final V setValue(V newValue) {
V oldValue = value;
value = newValue;
return oldValue;
}
public final boolean equals(Object o) {
if (o == this)
return true;
if (o instanceof Map.Entry) {
Map.Entry<?,?> e = (Map.Entry<?,?>)o;
if (Objects.equals(key, e.getKey()) &&
Objects.equals(value, e.getValue()))
return true;
}
return false;
}
}
Node
实际上是一个链表节点,这样才能实现 hash table 的链表引用。
在 HashMap 的实现中,采用 Node 数组作为哈希表结构。
结构如下图:
transient Node<K,V>[] table;
举个例子,我们使用如下程序初始化:
private void mapTest() {
Map<String, String> stringMap = new HashMap<>();
stringMap.put("key_1", "value_1");
stringMap.put("key_2", "value_2");
stringMap.put("key_3", "value_3");
stringMap.put("key_4", "value_4");
}
初始化之后,存储的结构如下图所示:
我们来看 HashMap 具体怎么实现存储。
put 过程
根据散列表原理,对每一个加入散列表的元素都要求他的哈希值找到对应的存储位置。在 hashMap
中,我们使用 key 查找值,所以在散列表中,只要能根据 key 的 hash 值快速找到 Node 节点,就可以定位 value 值,所以 hashMap 使用 key 的 hash 值作为 hash table 的索引。
以下是 put 函数的部分源代码:
public V put(K key, V value) {
return putVal(hash(key), key, value, false, true);
}
//onlyIfAbsent:if true, don't change existing value
//evict:if false, the table is in creation mode
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
// step 1: resize if necessary
Node<K,V>[] tab; Node<K,V> p; int n, i;
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
// step 2: if no entry in current tab, create a new node
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);
else {
Node<K,V> e; K k;
// step 3: check if first entry key equals key
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;
else {
// step 4: this loop state search entry key equals key in s specific hash or create a new entry if no equals entry found
for (int binCount = 0; ; ++binCount) {
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null);
break;
}
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}
}
// step 5: modifify if exist
if (e != null) { // existing mapping for key
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
return oldValue;
}
}
++modCount;
if (++size > threshold)
resize();
return null;
}
最主要的函数是 putVal 函数,他把 key 的哈希值,key 值和 value 值最为参数,总共分为五步,如下所示:
put 步骤
- step 1:如果 table 为空或者 table 长度为 0,重新配置 table
- step 2:利用
(n - 1) & hash
找到 entry 所在的链表头。如果链表头为空,直接创建新节点。
- 用
n - 1
和 hash 做与操作,可以将结果限制在 0 到 n - 1 中,优点是速度快,缺点是相当于只有低位参加了 hash 的过程,导致碰撞几率增大。
- step 3:检查链表头的 key 是否等于所期望的 key。如果一致,记录当前 node
- 需要满足两个条件
p.hash == hash
和(k = p.key) == key || (key != null && key.equals(k))
- 第一个条件指的是同一个键的 hash 结果应该维持不变,所以先检查 hash 结果,如果 hash 结果不一样,则键肯定不一样(一致性)
- 第二个条件是指满足所期望的 key 和链表头的 key 为同一对象(同一内存)或者 equal 函数相同
- step 4:遍历链表,如果满足当前遍历的节点和期望的 key 相同,记录当前 node,break;如果链表中没有满足要求的 node,新建节点在链表最后。
- step 5:对于 step 3 和 step 4,如果存在满足 key 条件的节点,表明在原来的链表中有记录,根据 onlyIfAbsent 参数决定是否更新。
get 过程
get 过程比较简单,根据 key 值遍历 Map,如果有元素,则返回值,如果没有,则返回空,代码如下:
public V get(Object key) {
Node<K,V> e;
return (e = getNode(hash(key), key)) == null ? null : e.value;
}
final Node<K,V> getNode(int hash, Object key) {
Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
if ((tab = table) != null && (n = tab.length) > 0 &&
(first = tab[(n - 1) & hash]) != null) {
if (first.hash == hash && // always check first node
((k = first.key) == key || (key != null && key.equals(k))))
return first;
if ((e = first.next) != null) {
if (first instanceof TreeNode)
return ((TreeNode<K,V>)first).getTreeNode(hash, key);
do {
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
return e;
} while ((e = e.next) != null);
}
}
return null;
}
hash 函数
因为在 get 方法时使用了 (n - 1) & hash
,而不是取模运算,相当与只有低位参加了运算,所以碰撞几率相当的高,为了减少碰撞几率,hashMap 使用了一个支持方法 hash ,将 key 值 hash 的高位和低位做与或运算,使得高位也参加到 hash 的计算中,减少碰撞几率。
static final int hash(Object key) {
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}
resize 实现
每当哈希表的使用率大于 $loadFactory * capicity$
时,会自动扩大哈希表,标准是每次扩大两倍。
扩大两倍可以很好的解决元素重新排列的问题。我们知道 hashMap 使用 $ (n - 1) \& hash$
作为哈希和表的映射,每一个元素需要调整的位置只能是当前位置或者是当前位置加上原来容量之后的位置,高位决定了是当前位置还是两倍之后的位置。
假设当前容量大小为 $16$
($0x10000$
),某一元素的哈希值为 $44$
($0x101100$
),不考虑内置的 hash 函数,直接将 15 和 44 做与操作,那么
会得到 $0x1100$
,即在原来应该放在 12 号位。容量扩大两倍,实际上是对 16 向左移动了 1 位,得到 32,当和 44 做与操作时,实际上前 4 位不会变动,只有第五位可能有区别,在这个例子中,做与操作仍然是放在 12 号位,当然第五位可能是 1,变成当前位置加上原来容量之后的位置。这样就不用重新计算每一个的位置,而只需要计算高位不同的 hash 值并将存放位置加上当前容量的偏移。
note
- 最大 table 长度为
$1 << 30$
代码如下:
final Node<K,V>[] resize() {
Node<K,V>[] oldTab = table;
int oldCap = (oldTab == null) ? 0 : oldTab.length;
int oldThr = threshold;
int newCap, newThr = 0;
if (oldCap > 0) {
if (oldCap >= MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return oldTab;
}
else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
oldCap >= DEFAULT_INITIAL_CAPACITY)
newThr = oldThr << 1; // double threshold
}
else if (oldThr > 0) // initial capacity was placed in threshold
newCap = oldThr;
else { // zero initial threshold signifies using defaults
newCap = DEFAULT_INITIAL_CAPACITY;
newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
}
if (newThr == 0) {
float ft = (float)newCap * loadFactor;
newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
(int)ft : Integer.MAX_VALUE);
}
threshold = newThr;
@SuppressWarnings({"rawtypes","unchecked"})
Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
table = newTab;
if (oldTab != null) {
for (int j = 0; j < oldCap; ++j) {
Node<K,V> e;
if ((e = oldTab[j]) != null) {
oldTab[j] = null;
if (e.next == null)
newTab[e.hash & (newCap - 1)] = e;
else if (e instanceof TreeNode)
((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
else { // preserve order
Node<K,V> loHead = null, loTail = null;
Node<K,V> hiHead = null, hiTail = null;
Node<K,V> next;
do {
next = e.next;
if ((e.hash & oldCap) == 0) {
if (loTail == null)
loHead = e;
else
loTail.next = e;
loTail = e;
}
else {
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead;
}
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead;
}
}
}
}
}
return newTab;
}
迭代器构建
HashMap 的迭代器都基于抽象类 HashIterator
,都是快速失败迭代器。
HashIterator
基于深度遍历的思想,首先遍历一个链表结构,然后遍历下一个链表结构。
代码如下:
abstract class HashIterator {
Node<K,V> next; // next entry to return
Node<K,V> current; // current entry
int expectedModCount; // for fast-fail
int index; // current slot
HashIterator() {
expectedModCount = modCount;
Node<K,V>[] t = table;
current = next = null;
index = 0;
if (t != null && size > 0) { // advance to first entry
// 将 next 指向第一个存在的(不为空)的链表
do {} while (index < t.length && (next = t[index++]) == null);
}
}
public final boolean hasNext() {
return next != null;
}
final Node<K,V> nextNode() {
Node<K,V>[] t;
Node<K,V> e = next;
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
if (e == null)
throw new NoSuchElementException();
if ((next = (current = e).next) == null && (t = table) != null) {
do {} while (index < t.length && (next = t[index++]) == null);
}
return e;
}
public final void remove() {
Node<K,V> p = current;
if (p == null)
throw new IllegalStateException();
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
current = null;
K key = p.key;
removeNode(hash(key), key, null, false, false);
expectedModCount = modCount;
}
}
在 HashIterator
的基础上,hashMap 实现了自己的两个迭代器,如下:
final class KeyIterator extends HashIterator
implements Iterator<K> {
public final K next() { return nextNode().key; }
}
final class ValueIterator extends HashIterator
implements Iterator<V> {
public final V next() { return nextNode().value; }
}
final class EntryIterator extends HashIterator
implements Iterator<Map.Entry<K,V>> {
public final Map.Entry<K,V> next() { return nextNode(); }
}
Java 1.8 性能改进
HashMap 最重要的三个参数, threshold 、 loadFactor 和 capacity。
capcity 指的是 table 的大小。
loadFactory 是一个比例,指 table 的填充比例,即当 table 中的元素填充个数大于 $ capacity * loadFactor$
时(数组中有 $ capacity * loadFactor$
个项不为空),则扩充节点。
不同于低版本的 HashMap 实现, 1.8 增加了 threshold 指的是当单链表的长度大于 threshold 时,将单链表重新组合成红黑树形式的存储结构,增加读取效率。详细原理可参见 treeMap
treeMap
特性
TreeMap
是基于红黑树实现的一个 map,有几大基本特性。
特性
- 基于红黑树实现
- 提供所有 map 接口。
- 非线程安全
- 迭代时的顺序按照键值排列,即存储顺序是有限的(在这个基础上,同时实现了
NavigableMap
接口,用于查找某个 key 之前的所有元素等操作)- 对于 put 操作,时间复杂度是
$O(logn)$
,对于增加和删除操作,时间复杂度不能保证- 支持
SortedMap
接口,如 firstKey(),lastKey(),取得最大最小的key,或sub(fromKey, toKey), tailMap(fromKey) 剪取Map的某一段
TreeMap 实现原理
2-3 查找树
利用树进行查找时,希望树尽量的平衡,这样才能够保证在每一次的查找保证 $O(logn)$
的复杂度。在动态插入的过程种要维持平衡二叉树的代价太高,所以使用一种新型的平衡树 - 2-3 查找树。
对于一个二叉查找树,他的每一个节点有一个值和两条连接,左连接指向的二叉查找树的值都小于该节点,右连接指向的二叉查找树的值都大于该节点,对于一个整数类型的数组 int[] a = new int[] {1,2,3,4,5,6,7}
,他所构成的平衡二叉查找树如下所示:
现引入 2-3 查找树,定义如下:
定义
- 为一棵空树或由以下节点组成
- 2-节点,含有一个值和两条连接,左连接指向的 2-3 树中的值都小于该节点,右连接指向的 2-3 树的值都大于该节点(类似于查找二叉树)
- 3-节点,含有两个值和三条连接,左连接指向的 2-3 树中的值都小于该节点,中连接指向的 2-3 树的值位于该节点的两个值之间,右连接指向的 2-3 树的值都大于该节点
对于一个字符数组 char[] chars = new char[] {A,C,H,L,P,S,X,E,J,R,M}
,他的平衡 2-3 树如下所示:
查找
2-3 树查找过程和二叉树相似,略。
添加
在一个只有根节点且是 2-节点的树上添加元素。为了保证平衡,我们需要将该节点替换成一个 3-节点,如下所示:
在一个只有根节点且是 3-节点的树上添加元素。为了保证平衡,我们需要将该节点做局部变化,操作如下:首先将该节点临时增加一个值变成 4-节点,然后对四节点进行拆分,变成 3 个 2-节点,如下所示:
在一个父节点且是 2-节点,该节点是3-节点的树上添加元素。为了保证平衡,我们需要将该节点做局部变化,操作如下:首先将该节点临时增加一个值变成 4-节点,然后对四节点进行拆分,变成 3 个 2-节点,最后将一个 2-节点 和 父节点合并,如下所示:
在一个父节点是 3-节点,该节点是3-节点的树上添加元素。为了保证平衡,我们需要将该节点做局部变化,操作如下:首先将该节点临时增加一个值变成 4-节点,然后对四节点进行拆分,变成 3 个 2-节点,最后将一个 2-节点 和 父节点合并然后递归的对父节点进行操作,直到根节点或者父节点是 2-节点停止,如下所示:
删除
参见 mengzhisuoliu 博客
由此可见, 2-3 树是由下向上生长的,但是删除操作需要对树进行从上和从下两方面的判断,相对来说,删除非常费时。
红黑树
红黑树是一种 2-3 平衡树的实现。不用去定义特殊的新的数据结构,只需要一些附加信息,就可以实现 2-3 树的构建。
在红黑树种,利用黑连接表示 2-3 树的普通节点, 红连接将两个 2-节点连接够成一个三节点。
一个 2-3 树可以化成一个等效的红黑树,如下图所示:
定义
- 红连接均为左连接
- 没有任何一个节点同时和两条连接相连
- 该树是黑色完美平衡的,即任意空连接到根节点的路径上的黑连接数量相同。
我们定义节点上存在 color 属性,代表的是指向该节点的连接是什么颜色。
红黑树基本的操作是旋转,在一些实现中,某些操作可能会出现红色右连接或者两条连续的红连接,我们定义左旋转是将一条红色的右连接旋转得到一条左连接。右连接相反,如下图所示:
左旋转的伪代码如下所示:
Node rotateLset(Node h) {
Node x = h.right;
h.right = x.left;
x.left = h;
x.color = h.color;
h.color = red;
}
只需更改他的 color 属性为红,并将他原来自身的 color 属性赋值给右连接节点就行。
同理,右连接示意图如下:
NOde rotateRight(Node h) {
node x= h.left;
h.left = x.right;
x.right = h;
x.color = h.color;
h.color = read;
}
对于每一个插入,都插入一个红连接。
向单个 2-节点中插入新键。如果新键小于老键,则增加一个红色左连接,否则增加一个红色右连接并进行左旋转,两种情况都能产生一个等效的3-连接,如下所示:
向树底部的 2-节点中插入新键。总是增加一个新的红色连接,如果他的父节点是 2-节点,那么按照如上两种方式调整节点就行。
向一棵双键树(3-节点)中插入新建,分为了三种子情况,第一种情况是新键最大,第二种情况是新键在中间,第三种情况是新键最小。
如下如所示:
对于一个节点和两个红色连接直接相联,这种情况等效于一个 4-节点,当将这两个红色连接变黑时,需要将父节点由黑变红,因为这样的变换会在父节点产生一个 3-节点,理由如下:
每产生一个红色连接都会向上传递直到根节点。
具体实现
如下代码展示了红黑树在 Map 中的存储节点。
static final class Entry<K,V> implements Map.Entry<K,V> {
K key;
V value;
Entry<K,V> left;
Entry<K,V> right;
Entry<K,V> parent;
boolean color = BLACK;
Entry(K key, V value, Entry<K,V> parent) {
this.key = key;
this.value = value;
this.parent = parent;
}
public K getKey() {
return key;
}
public V getValue() {
return value;
}
public V setValue(V value) {
V oldValue = this.value;
this.value = value;
return oldValue;
}
public boolean equals(Object o) {
if (!(o instanceof Map.Entry))
return false;
Map.Entry<?,?> e = (Map.Entry<?,?>)o;
return valEquals(key,e.getKey()) && valEquals(value,e.getValue());
}
public int hashCode() {
int keyHash = (key==null ? 0 : key.hashCode());
int valueHash = (value==null ? 0 : value.hashCode());
return keyHash ^ valueHash;
}
public String toString() {
return key + "=" + value;
}
}
以 boolean color
存储指向该节点的连接的颜色。
每当增加一个元素后,调用 fixAfterInsect
函数对红黑树进行修正,如下所示:
private void fixAfterInsertion(Entry<K,V> x) {
x.color = RED;
while (x != null && x != root && x.parent.color == RED) {
if (parentOf(x) == leftOf(parentOf(parentOf(x)))) {
Entry<K,V> y = rightOf(parentOf(parentOf(x)));
if (colorOf(y) == RED) {
setColor(parentOf(x), BLACK);
setColor(y, BLACK);
setColor(parentOf(parentOf(x)), RED);
x = parentOf(parentOf(x));
} else {
if (x == rightOf(parentOf(x))) {
x = parentOf(x);
rotateLeft(x);
}
setColor(parentOf(x), BLACK);
setColor(parentOf(parentOf(x)), RED);
rotateRight(parentOf(parentOf(x)));
}
} else {
Entry<K,V> y = leftOf(parentOf(parentOf(x)));
if (colorOf(y) == RED) {
setColor(parentOf(x), BLACK);
setColor(y, BLACK);
setColor(parentOf(parentOf(x)), RED);
x = parentOf(parentOf(x));
} else {
if (x == leftOf(parentOf(x))) {
x = parentOf(x);
rotateRight(x);
}
setColor(parentOf(x), BLACK);
setColor(parentOf(parentOf(x)), RED);
rotateLeft(parentOf(parentOf(x)));
}
}
}
root.color = BLACK;
}
ConcurrentHashMap
特性
特性
- 基于 hash map 实现
- 提供所有 map 接口
- 不允许以 null 作为键或者值
- 线程安全,检索操作不需要锁定整个表
- 检索操作通常不会阻塞,所以有可能和修改操作重叠
- 迭代时的顺序按照键值排列,即存储顺序是有限的(在这个基础上,同时实现了
NavigableMap
接口,用于查找某个 key 之前的所有元素等操作)
ConcurrentHashMap 实现原理
在 JDK 1.7 及其之前的版本中,ConcurrentHashMap
使用 Segement
数据结构作为上锁的最小单元,每一个 Segment
容纳了一个 table,结构如图所示(引用自 JOE-1992 博客):
每当进行 get 操作时,定位到对应的 segment,上锁,并执行 put 操作。
hash 操作
计算 hash 时,同样使用了内置的 hash 函数对 hash 进行再次求解。不过相比于 HashMap,ConcurrentHashMap 使用了single-word Wang/Jenkins hash 算法的变种。代码如下:
private static int hash(int h) {
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
note:Wang/Jenkins Hash 算法关键特性
- 雪崩性(更改输入参数的任何一位,就将引起输出有一半以上的位发生变化)
- 可逆性(input ==> hash ==> inverse_hash ==> input)
因此,使用 Wang/Jenkins Hash 更加能够获得冲突更小的 hash。
存储
在每一个 Segment
中,使用 HashEntry
作为存储结构,HashEntry
的定义如下:
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
//保证set对所有线程可见(volatile 语义)
final void setNext(HashEntry<K,V> n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
}
static final sun.misc.Unsafe UNSAFE;
static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = HashEntry.class;
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
其中,UNSAFE 为 Java 直接访问内存的函数,objectFieldOffset
为获得某一变量的偏移量, putOrderedObject(this, nextOffset, n)
是将 n 放在 this 偏移量 nextOffset 的位置,并且是一个具有 volatile 语义的修改,对所有的线程可见。
Segment
直接继承 ReentrantLock
,可以简化锁或者一些单独的构造器,使得其可以单独的当成一个锁。结构如下:
static final class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
//真实的存储结构
transient volatile HashEntry<K,V>[] table;
//对一个 segment 所有元素计数
transient int count;
transient int modCount;
//当 table 中包含的 HashEntry 元素的个数超过本变量值时,触发 table 的再散列
transient int threshold;
final float loadFactor;
//... method
}
整个类 维持了一个 Segment 数组,和必要的信息,如下:
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {
/**
* segments 的掩码值
* key 的散列码的高位用来选择具体的 segment
*/
final int segmentMask;
/**
* segment 外偏移量,和segment 维持多少个 table 有关
*/
final int segmentShift;
/**
* 由 Segment 对象组成的数组,每个都是一个特别的Hash Table
*/
final Segment<K,V>[] segments;
// 根据 hash 找到对应 segment
private Segment<K,V> segmentForHash(int h) {
// 重点在 (h >>> segmentShift) & segmentMask 函数
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// 在 volatile 的环境下读取 segments 的第 u 个元素
return (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u);
}
}
在更老的版本中,采用直接加锁的方式修改数据,代码如下:
// ConcurrentHashMap 方法
public V put(K key, V value) {
if (value == null) //ConcurrentHashMap 中不允许用 null 作为映射值
throw new NullPointerException();
int hash = hash(key.hashCode()); //计算键对应的散列码
//根据散列码找到对应的 Segment
return segmentFor(hash).put(key, hash, value, false);
}
// Segment 方法
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock(); //当前的segment加锁
try {
int c = count;
if (c++ > threshold) //如果超过再散列的阈值
rehash(); //执行再散列,table 数组的长度将扩充一倍
HashEntry<K,V>[] tab = table;
//把散列码值与 table 数组的长度减 1 的值相“与”
//得到该散列码对应的 table 数组的下标值
int index = hash & (tab.length - 1);
//找到散列码对应的具体的那个桶
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue;
if (e != null) { //如果键/值对以经存在
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value; // 设置 value 值
}
else { //键/值对不存在
oldValue = null;
++modCount; //添加新节点到链表中,modCont 要加 1
// 创建新节点,并添加到链表的头部
tab[index] = new HashEntry<K,V>(key, hash, first, value);
count = c; //写 count 变量
}
return oldValue;
} finally {
unlock(); //解锁
}
}
// get
V get(Object key, int hash) {
if(count != 0) { // 首先读 count 变量
HashEntry<K,V> e = getFirst(hash);
while(e != null) {
if(e.hash == hash && key.equals(e.key)) {
V v = e.value;
if(v != null)
return v;
// 如果读到 value 域为 null,说明发生了重排序,加锁后重新读取
return readValueUnderLock(e);
}
e = e.next;
}
}
return null;
}
在如上的实现中,最重要的是 get 函数的 count = c
操作和 put 函数的 count != 0
操作。这时实现可见性的关键。
当一个线程上锁修改之后,会调用 count = c
。 count 是 volatile 类型的变量,这时就会将修改之后的 map 对所有线程可见。
当另一个线程读取时,会调用 count != 0
,这个操作保证了更新之后的值对当前线程可见。
对于 1.7 版本,参见技术世界
对于读操作,获取Key所在的Segment时,需要保证可见性。具体实现上可以使用volatile关键字,也可使用锁。但使用锁开销太大,而使用volatile时每次写操作都会让所有CPU内缓存无效,也有一定开销。ConcurrentHashMap使用如下方法保证可见性,取得最新的Segment。
Segment<K,V> s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)
获取Segment中的HashEntry时也使用了类似方法
HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE)
获取锁时,并不直接使用lock来获取,因为该方法获取锁失败时会挂起。事实上,它使用了自旋锁,如果tryLock获取锁失败,说明锁被其它线程占用,此时通过循环再次以tryLock的方式申请锁。如果在循环过程中该Key所对应的链表头被修改,则重置retry次数。如果retry次数超过一定值,则使用lock方法申请锁。
这里使用自旋锁是因为自旋锁的效率比较高,但是它消耗CPU资源比较多,因此在自旋次数超过阈值时切换为互斥锁。
为更好支持并发操作,ConcurrentHashMap会在不上锁的前提逐个Segment计算3次size,如果某相邻两次计算获取的所有Segment的更新次数(每个Segment都与HashMap一样通过modCount跟踪自己的修改次数,Segment每修改一次其modCount加一)相等,说明这两次计算过程中无更新操作,则这两次计算出的总size相等,可直接作为最终结果返回。如果这三次计算过程中Map有更新,则对所有Segment加锁重新计算Size。该计算方法代码如下
public int size() {
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
对于 1.8 版本,参见技术世界 ,使用 CAS 直接完成任务。
对于put操作,如果Key对应的数组元素为null,则通过CAS操作将其设置为当前值。如果Key对应的数组元素(也即链表表头或者树的根元素)不为null,则对该元素使用synchronized关键字申请锁,然后进行操作。如果该put操作使得当前链表长度超过一定阈值,则将该链表转换为树,从而提高寻址效率。
对于读操作,由于数组被volatile关键字修饰,因此不用担心数组的可见性问题。同时每个元素是一个Node实例(Java 7中每个元素是一个HashEntry),它的Key值和hash值都由final修饰,不可变更,无须关心它们被修改后的可见性问题。而其Value及对下一个元素的引用由volatile修饰,可见性也有保障。
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}
对于Key对应的数组元素的可见性,由Unsafe的getObjectVolatile方法保证。
put、remove和get操作只需要关心一个Segment,而size操作需要遍历所有的Segment才能算出整个Map的大小。一个简单的方案是,先锁住所有Sgment,计算完后再解锁。但这样做,在做size操作时,不仅无法对Map进行写操作,同时也无法进行读操作,不利于对Map的并行操作。
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
size操作
put方法和remove方法都会通过addCount方法维护Map的size。size方法通过sumCount获取由addCount方法维护的Map的size。
ConcurrentSkipListMap
使用跳表实现。
跳跃表原理参见漫画算法:什么是跳跃表