map
HashMap:
JDK1.7
HashMap 里面是一个数组(transient Node<K,V>[] table),然后数组中每个元素是一个单向链表,由Node内部类实现;
数组的优点是:
数组是顺序存储结构,通过数组下标可以快速实现对数组元素的访问,效率极高;
数组的缺点是:
插入或删除元素效率较低,因为可能需要数组扩容、移动元素;
链表的优点是:
链表是一种链式存储结构,插入或删除元素不需要移动元素,只需要修改指向下一个节点的指针域,效率较高;
链表的缺点是:
链表访问元素需要从头到尾逐个遍历,效率较低;
JDK1.8 hashMap优化
1.数据存储结构,1.8中,如果链表长度超过了8,那么链表将转换为红黑树(平衡二叉树,TreeNode<K,V>),优化链表的查询速率,节点是根据hash值排序的
链表时的 复杂度为O(n),红黑树的时候O(log(n))
2.发生hash碰撞时,1.7会在链表的头部插入,而1.8会在链表的尾部插入
3.1.8中Entry被Node(实现Map.Entry接口)替代
4.hash的实现,1.8中,是通过hashCode的高16位异或低16位实现的
(h = k.hashCode()) ^ (h >>> 16)
主要是从性能,hash碰撞来考虑,减少系统的开销,也不会造成因为高位没有参与下表的计算,从而引起的碰撞(减少hash碰撞)
hash值的实现(JDK 1.8)
当key为null时,hash为0
其他key的hash为hashCode的高16位异或低16位,hash是32位
static final int hash(Object key) {
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}
下标的计算方式
i = (n - 1) & hash;//n为数组长度
hashMap的扩容
hashMap初始容量16,扩容因子0.75,容量最大值2^30
如果当HashMap的容量超过12时,则进行扩容.
新建一个Node<K,V>[]数组,遍历原数组数据,赋值给新数组(需要重新计算每个元素的下标)
新hashMap容量为原Map的2倍
HashMap的put操作源码分析
1、调用哈希函数获取Key对应的hash值,再计算其数组下标;
2、如果没有出现哈希冲突,则直接放入数组,如果出现哈希冲突,则以链表的方式放在链表后面;
3、如果链表长度超过阀值( TREEIFY THRESHOLD==8),就把链表转成红黑树;
4、如果结点的key已经存在,则替换其value即可;
5、如果集合中的键值对大于12,调用resize方法进行数组扩容
HashMap的get操作源码分析
1、根据key的hash值计算数组的下标;
2、根据计算得到的数组下标访问数组元素,如果数组元素为null,则返回空;
3、根据计算得到的数组下标访问数组元素,如果数组元素不为null,则遍历该数组元素单向链表的每个节点,如果某个节点的key与当前key相等,则把该节点的值返回;
4、根据计算得到的数组下标访问数组元素,如果数组元素不为null,则遍历该数组元素单向链表的每个节点,如果某个节点的key与当前key都不相等,则返回null;
5.判断元素是否为要查询的元素条件为
first.hash == hash && ((k = first.key) == key || (key != null && key.equals(k)))
即hash值一致,key值一致
final Node<K,V> getNode(int hash, Object key) {
Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
if ((tab = table) != null && (n = tab.length) > 0 &&
(first = tab[(n - 1) & hash]) != null) {
if (first.hash == hash && // always check first node
((k = first.key) == key || (key != null && key.equals(k))))
return first;
if ((e = first.next) != null) {
if (first instanceof TreeNode)
return ((TreeNode<K,V>)first).getTreeNode(hash, key);
do {
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
return e;
} while ((e = e.next) != null);
}
}
return null;
}
HashMap的常见面试
1、HashMap 的数据结构?
哈希表结构(数组+链表)实现,结合数组和链表的优点,当链表长度超过8时,链表转换为红黑树;
2、HashMap的hash运算如何实现的?为什么这样实现?
HashMap为什么不直接使用对象的原始hash值?它的实现代码如下:
static final int hash(Object key) {
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}
JDK 1.8 中,是通过 hashCode() 的高 16 位异或低 16 位实现的:(h = k.hashCode()) ^ (h >>> 16),主要是从性能,hash碰撞来考虑的,减少系统的开销,也不会造成因为高位没有参与下标的计算,从而引起的碰撞;
使用异或操作,一个是提高性能,一个减少hash碰撞;
通过移位和异或运算,可以让 hash 变得更复杂,进而影响 hash 的分布性;
3、HashMap的容量是多少?加载因子是什么?容量如何变化?容量不够怎么办?
数组大小是由 capacity 这个参数确定的,默认是16,也可以构造时传入,最大限制是1<<30;
loadFactor 是装载因子,主要目的是用来确认table 数组是否需要动态扩展,默认值是0.75,比如table 数组大小为 16,装载因子为 0.75 时,threshold 就是12,当 table 的实际大小超过 12 时,table就需要动态扩容;
扩容时,调用 resize() 方法,将 table 长度变为原来的两倍;
扩容时创建一个新的数组,其容量为旧数组的两倍,并重新计算旧数组中结点的存储位置;
如果数据量很大的情况下,扩容时将会带来性能的损失,在性能要求很高的地方,这种操作性能很低;
4、什么是hash碰撞,发生hash碰撞怎么办?
如果两个键计算出来的数组下标一样,那么就产生了hash碰撞,hash碰撞的解决办法有
- 开放地址法
- 再哈希法
- 链地址法(拉链法) -->hashmap采用是该办法
- 建立一个公共溢出区,
HashMap采用的是3.链地址法(拉链法),当发生冲突时,将新结点添加在链表后面;
5、HashMap 和 HashTable 有什么区别?
HashMap 是线程不安全的,HashTable 是线程安全的;
由于线程安全,所以 HashTable 的效率比不上 HashMap;
HashMap最多只允许一条记录的键为null,允许多条记录的值为null,而 HashTable不允许;
HashMap 默认初始化数组的大小为16,HashTable 为 11,前者扩容时,扩大两倍,后者扩大两倍+1;
HashMap 需要重新计算 hash 值,而 HashTable 直接使用对象的 hashCode;
6、HashMap 与 ConcurrentHashMap 的区别?
除了加锁之外,原理上无太大区别,ConcurrentHashMap 类(是 Java并发包 java.util.concurrent 中提供的一个线程安全且高效的 HashMap 实现)。
ConcurrentHashMap,在 JDK 1.7 中采用分段锁的方式,JDK 1.8 中直接采用了CAS + synchronized,另外HashMap 的键值对允许有null,但是ConCurrentHashMap 都不允许。
7、我们能否让HashMap实现同步(线程安全)?
当然可以,使用Map map = Collections.synchronizeMap(hashMap);
Java7 中的ConcurrentHashMap
ConcurrentHashMap 和 HashMap 基本相似,但由于要支持多线程并发操作,所以代码要复杂很多;
整个 ConcurrentHashMap 由一个个 Segment 组成,Segment 代表”一段“,所以我们经常也称为分段锁,也有人用“槽”来代表一个 segment;
通俗来说ConcurrentHashMap 是一个 Segment 数组,Segment 通过继承 ReentrantLock 来进行加锁,所以每次需要加锁的操作锁住的是一个 segment,这样只要保证每个 Segment 是线程安全的,也就实现了全局的线程安全。
ConcurrentHashMap 默认有 16 个 Segments,即理论上最多可以同时支持 16 个线程并发写,只要它们的操作分别分布在不同的 Segment 上,这个值可以在初始化的时候设置为其他值,但是一旦初始化以后,它是不可以扩容的;
每个Segment内部是一个HashEntry<K,V>[]数组,每一个HashEntry相当于一个单向链表
ConcurrentHashMap的初始化:
初始化需要3个参数:
1.initialCapacity 初始化容量 默认值16
2.loadFactor 加载因子 默认值0.75 , 给HashEntry<K,V>[]数组使用的
3.concurrencyLevel 并发等级 默认值16
Segment[]数组大小由concurrencyLevel 决定,并决定了一个ConcurrentHashMap被分为多少段,相当于最多支持多少个线程(不涉及同一段的数据的话)
HashEntry<K,V>[]数组的length由 cap = initialCapacity / concurrencyLevel 决定,但是初始默认值是2
while (cap < initialCapacity / concurrencyLevel)
cap <<= 1;
ConcurrentHashMap的put方法:
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)//ConcurrentHashMap 的key 和 value不可以为null 否则空指针
throw new NullPointerException();
//计算key的hash值,一系列的位移计算
int hash = hash(key);
//根据hash值找到Segment数组中的位置j
//hash 是 32 位,无符号右移segmentShift(28) 位,剩下低4位,
//与segmentMask(15)做一次与操作,也就是说j是hash值的最后4位,即槽的数组下标
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject
(segments, (j << SSHIFT) + SBASE)) == null)
// ConcurrentHashMap 初始化的时候会初始化第一个槽 segment[0],对于其他槽来说,在
//插入第一个值的时候进行初始化;
//如果有多个线程同时进来初始化同一个槽 segment[k],只有一个成功;
//用的Unsafe的compareAndSwapObject的native方法实现的
s = ensureSegment(j);//如果获取到的数据是null,则初始化该下标的Entry[]数组
//插入新值到槽s中
return s.put(key, hash, value, false);
}
根据key计算hash值,然后通过hash值与segmentShift,segmentMask 计算segments数组下标.
通过下标获取对应Entry<K,V>[]数组,最后的数据插入是在Entry中的.
segmentShift,segmentMask 初始化时逻辑如下:
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
槽中的put操作
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//tryLock()获取锁,如果没有获取锁,通过scanAndLockForPut自旋
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
//计算出HashEntry数组的下标
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {//链表不为空,遍历链表中的元素与插入key比较
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
//进行扩容,segment 数组不能扩容,segment 数组某个位置内部的数组
//HashEntry[] 可以进行扩容,扩容后容量为原来的 2 倍;
//触发扩容的时机是put 的时候如果该值的插入会导致该 segment 的元素个数
//超过阈值,那么先进行扩容再插值;
//该方法不需要考虑并发,因为持有该 segment 的独占锁的;
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();//释放锁
}
return oldValue;
}
加锁:内部类Segment继承至ReentrantLock.
采用的是ReentrantLock的非公平锁
tryLock()方法由
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
scanAndLockForPut自旋获取锁:
在往某个 segment 中 put 的时候,首先会调用 node = tryLock() ? null : scanAndLockForPut(key, hash, value),也就是说先进行一次 tryLock() 快速获取该 segment 的独占锁,如果失败,那么进入到 scanAndLockForPut 这个方法来获取锁;
该方法有两个出口,一个是 tryLock() 成功循环终止,另一个就是重试次数超过MAX_SCAN_RETRIES,进到 lock() 方法,此方法会阻塞等待,直到成功拿到独占锁;
所以该方法主要就是获取该 segment 的独占锁,如果需要同时实例化了node;
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
Java8 ConcurrentHashMap
Java7 中ConcurrentHashMap的实现比较复杂,在Java8 对其进行了比较大的改动,摒弃了java7的segment设计,Java8 中ConcurrentHashMap的实现与Java8中HashMap的实现比较类似,也是采用 数组+链表+红黑树 的形式,数组可以扩容,链表可以转化为红黑树,只是需要保证线程并发安全;
构造方法及初始化
public ConcurrentHashMap() {
}
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
该初始化方法根据提供的初始容量,计算出sizeCtl,sizeCtl = 【 (1.5 * initialCapacity + 1),然后向上取最近的 2 的 n 次方】。如 initialCapacity 为 10,那么得到 sizeCtl 为 16,如果 initialCapacity 为 11,得到 sizeCtl 为 32。未指定initialCapacity ,则sizeCtl 默认为16;
put 过程分析
put 数据的时候使用CAS+sync.
CAS用于存放数组中的数据.
sync用于为链表进行数据的存放
因为给数组放数据可以是原子操作.而给链表需要进行遍历不可能是原子操作,所以必须加锁.
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
//计算hash值
int hash = spread(key.hashCode());
//用于记录相应链表的长度
int binCount = 0;
//死循环,存放数据,直到数据存放成功
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
//如果数组"空",进行数组初始化
tab = initTable();
//找该 hash 值对应的数组下标,得到第一个节点f
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//如果数组该位置为空,
//用一次CAS操作将这个新值放入其中即可,此时put操作结束
//如果CAS失败,那说明有并发操作,那么自旋进入到下一次循环
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// hash等于MOVED,需要扩容
else if ((fh = f.hash) == MOVED)
// 帮助数据迁移
tab = helpTransfer(tab, f);
else {//到这里说明f是该位置的头结点,而且不为空
V oldVal = null;
//获取数组该位置的头结点的监视器锁
//锁的是当前数组位置的链表Node<K,V>
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {//头结点的hash值大于0,说明是链表
//用于累加,记录链表的长度
binCount = 1;
//遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
//如果找到相等的key,
//判断是否要进行值覆盖,然后就可以break
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//到了链表的最末端,将该新值放到链表的最后面
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
//调用红黑树的方法插入新节点
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//判断是否要将链表转换为红黑树,
//临界值和HashMap 一样也是 8
if (binCount >= TREEIFY_THRESHOLD)
//该方法和HashMap中稍有不同,那就是它不是一定会进行红黑树转换,
//如果当前数组长度小于64,会选择进行数组扩容,而不是转为红黑树
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
hash计算:
// HASH_BITS = 0x7fffffff ,h = key.hashCode()
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
数组下标计算:
(n - 1) & hash
初始化数组:initTable
主要是初始化一个合适大小的数组,然后设置 sizeCtl。
初始化方法中的并发问题是通过对 sizeCtl 进行一个 CAS 操作来控制的。
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//初始化操作被其他线程"抢去"了
if ((sc = sizeCtl) < 0)
//让出CPU
Thread.yield(); // lost initialization race; just spin
// CAS,将 sizeCtl 设置为 -1,代表抢到了锁
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
//DEFAULT_CAPACITY 默认初始容量是16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//初始化数组,长度为 16 或初始化时提供的长度
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
//将这个数组赋值给table,table是volatile的
table = tab = nt;
//如果n 为 16,那么 sc = 12,也就是 0.75 * n
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
链表转红黑树: treeifyBin
Java8中treeifyBin 不一定就会进行红黑树转换,也可能是仅仅做数组扩容,一旦链表中的元素个数超过了8个,那么可以执行数组扩容或者链表转为红黑树,这里依据的策略跟HashMap依据的策略是一致的。
当数组长度还未达到64个时,优先数组的扩容,否则选择链表转为红黑树;
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
//MIN_TREEIFY_CAPACITY 为 64
//如果数组长度小于 64,也就是 32 或者 16 或者更小的时候,会进行数组扩容
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
//遍历链表,建立一棵红黑树
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
//将红黑树设置到数组相应位置中
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}