https://mp.weixin.qq.com/s/VtIpj-uuxFj5Bf6TmTJMTw
为什么线程不安全
if ((p = tab[i = (n - 1) & hash]) == null) // 如果没有hash碰撞则直接插入元素
tab[i] = newNode(hash, key, value, null);
这是jdk1.8中HashMap中put操作的主函数, 注意代码,如果没有hash碰撞则会直接插入元素。如果线程A和线程B同时进行put操作,刚好这两条不同的数据hash值一样,并且该位置数据为null,所以这线程A、B都会进入第6行代码中。假设一种情况,线程A进入后还未进行数据插入时挂起,而线程B正常执行,从而正常插入数据,然后线程A获取CPU时间片,此时线程A不用再进行hash判断了,问题出现:线程A会把线程B插入的数据给覆盖,发生线程不安全。
首先HashMap是线程不安全的,其主要体现:
1.在jdk1.7中,在多线程环境下,扩容时会造成环形链或数据丢失。
2.在jdk1.8中,在多线程环境下,会发生数据覆盖的情况。
为什么设置这个默认值
默认为什么大小是16
可能就是一个经验值,定义16没有很特别的原因,只要是2次幂,其实用 8 和 32 都差不多。用16只是因为作者认为16这个初始容量是能符合常用而已。
Hashmap中的链表大小超过八个时会自动转化为红黑树,当删除小于六时重新变为链表
根据泊松分布,在负载因子默认为0.75的时候,单个hash槽内元素个数为8的概率小于百万分之一,所以将7作为一个分水岭,等于7的时候不转换,大于等于8的时候才进行转换,小于等于6的时候就化为链表。
并发安全的map
- 使用Collections.synchronizedMap(Map)创建线程安全的map集合;
- Hashtable
- ConcurrentHashMap
SynchronizedMap
在SynchronizedMap内部维护了一个普通对象Map,还有排斥锁mutex,如图
Map<String, Object> map = Collections.synchronizedMap(new HashMap<String, Object>());
我们在调用这个方法的时候就需要传入一个Map,可以看到有两个构造器,如果你传入了mutex参数,则将对象排斥锁赋值为传入的对象。
如果没有,则将对象排斥锁赋值为this,即调用synchronizedMap的对象,就是上面的Map。
创建出synchronizedMap之后,再操作map的时候,就会对方法上锁,如图全是🔐
Hashtable
Map<String, Object> map2 = new Hashtable<String, Object>();
对数据操作的时候都会上锁,所以效率比较低下。
Hashtable 跟HashMap的区别
- Hashtable 是不允许键或值为 null 的,HashMap 的键值则都可以为 null
- 因为Hashtable在我们put 空值的时候会直接抛空指针异常,但是HashMap却做了特殊处理。
- 这是因为Hashtable使用的是安全失败机制(fail-safe),这种机制会使你此次读到的数据不一定是最新的数据。
- 如果你使用null值,就会使得其无法判断对应的key是不存在还是为空,因为你无法再调用一次contain(key)来对key是否存在进行判断,
ConcurrentHashMap
同理
1.实现方式不同:
Hashtable
继承了Dictionary
类,而HashMap
继承的是AbstractMap
类。
2.初始化容量不同:HashMap
的初始容量为:16,Hashtable
初始容量为:11,两者的负载因子默认都是:0.75
3.扩容机制不同: 当现有容量大于总容量 * 负载因子时,HashMap
扩容规则为当前容量翻倍,Hashtable 扩容规则为当前容量翻倍 + 1
- 迭代器不同:
HashMap
中的Iterator
迭代器是fail-fast
的,而Hashtable
的Enumerator
不是 fail-fast 的。
所以,当其他线程改变了HashMap 的结构,如:增加、删除元素,将会抛出ConcurrentModificationException
异常,而 Hashtable 则不会.
fail-fast 快速失败
快速失败(fail—fast)是java集合中的一种机制, 在用迭代器遍历一个集合对象时,如果遍历过程中对集合对象的内容进行了修改(增加、删除、修改),则会抛出
ConcurrentModificationException
。
迭代器在遍历时直接访问集合中的内容,并且在遍历过程中使用一个 modCount 变量。
集合在被遍历期间如果内容发生变化,就会改变modCount的值。
每当迭代器使用hashNext()/next()遍历下一个元素之前,都会检测modCount变量是否为expectedmodCount值,是的话就返回遍历;否则抛出异常,终止遍历。
Tip:这里异常的抛出条件是检测到 modCount!=expectedmodCount 这个条件。如果集合发生变化时修改modCount值刚好又设置为了expectedmodCount值,则异常不会抛出。
因此,不能依赖于这个异常是否抛出而进行并发操作的编程,这个异常只建议用于检测并发修改的bug。
fail—safe 安全失败
java.util.concurrent包下的容器都是安全失败,可以在多线程下并发使用,并发修改。
ConcurrentHashMap
ConcurrentHashMap 1.7的结构
数组 + 链表:由 Segment 数组、HashEntry 组成,和 HashMap 一样,仍然是数组加链表。
Segment 是 ConcurrentHashMap 的一个内部类,主要的组成如下:
static final class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
// 和 HashMap 中的 HashEntry 作用一样,真正存放数据的桶
transient volatile HashEntry<K,V>[] table;
transient int count;
// 记得快速失败(fail—fast)么?
transient int modCount;
// 大小
transient int threshold;
// 负载因子
final float loadFactor;
}
HashEntry跟HashMap差不多的,但是不同点是,他使用
volatile
去修饰了他的数据Value还有下一个节点next。
并发度高的原因
ConcurrentHashMap 采用了分段锁技术,其中 Segment 继承于 ReentrantLock。
每当一个线程占用锁访问一个 Segment 时,不会影响到其他的 Segment。
就是说如果容量大小是16他的并发度就是16,可以同时允许16个线程操作16个Segment而且还是线程安全的。
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();//这就是为啥他不可以put null值的原因
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject
(segments, (j << SSHIFT) + SBASE)) == null)
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
他先定位到Segment,然后再进行put操作。
put源代码
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 将当前 Segment 中的 table 通过 key 的 hashcode 定位到 HashEntry
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
// 遍历该 HashEntry,如果不为空则判断传入的 key 和当前遍历的 key 是否相等,相等则覆盖旧的 value。
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
// 不为空则需要新建一个 HashEntry 并加入到 Segment 中,同时会先判断是否需要扩容。
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
//释放锁
unlock();
}
return oldValue;
}
HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
- 尝试自旋获取锁。
- 如果重试的次数达到了 MAX_SCAN_RETRIES 则改为阻塞锁获取,保证能获取成功。
ConcurrentHashMap 1.8的结构
其中抛弃了原有的 Segment 分段锁,而采用了 CAS + synchronized 来保证并发安全性。
HashMap很像,也把之前的HashEntry改成了Node,但是作用不变,把值和next采用了volatile去修饰,
volatile Node<K,V>[] table
,transient volatile Node<K,V>[] nextTable
,保证了可见性,并且也引入了红黑树,在链表大于一定值的时候会转换(默认是8)。
ConcurrentHashMap在进行put操作的还是比较复杂的,大致可以分为以下步骤
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();//K,V都不能为空,否则的话跑出异常
int hash = spread(key.hashCode()); //取得key的hash值
int binCount = 0; //用来计算在这个节点总共有多少个元素,用来控制扩容或者转移为树
for (Node<K,V>[] tab = table;;) { //
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); //第一次put的时候table没有初始化,则初始化table
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //通过哈希计算出一个表中的位置因为n是数组的长度,所以(n-1)&hash肯定不会出现数组越界
if (casTabAt(tab, i, null, //如果这个位置没有元素的话,则通过cas的方式尝试添加,注意这个时候是没有加锁的
new Node<K,V>(hash, key, value, null))) //创建一个Node添加到数组中区,null表示的是下一个节点为空
break; // no lock when adding to empty bin
}
/*
* 如果检测到某个节点的hash值是MOVED,则表示正在进行数组扩张的数据复制阶段,
* 则当前线程也会参与去复制,通过允许多线程复制的功能,一次来减少数组的复制所带来的性能损失
*/
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
/*
* 如果在这个位置有元素的话,就采用synchronized的方式加锁,
* 如果是链表的话(hash大于0),就对这个链表的所有元素进行遍历,
* 如果找到了key和key的hash值都一样的节点,则把它的值替换到
* 如果没找到的话,则添加在链表的最后面
* 否则,是树的话,则调用putTreeVal方法添加到树中去
*
* 在添加完之后,会对该节点上关联的的数目进行判断,
* 如果在8个以上的话,则会调用treeifyBin方法,来尝试转化为树,或者是扩容
*/
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) { //再次取出要存储的位置的元素,跟前面取出来的比较
if (fh >= 0) { //如果是链表的话(hash大于0),取出来的元素的hash值大于0,当转换为树之后,hash值为-2
binCount = 1;
for (Node<K,V> e = f;; ++binCount) { //遍历这个链表
K ek;
if (e.hash == hash && //要存的元素的hash,key跟要存储的位置的节点的相同的时候,替换掉该节点的value即可
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent) //当使用putIfAbsent的时候,只有在这个key没有设置值得时候才设置
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) { //如果不是同样的hash,同样的key的时候,则判断该节点的下一个节点是否为空,
pred.next = new Node<K,V>(hash, key, //为空的话把这个要加入的节点设置为当前节点的下一个节点
value, null);
break;
}
}
}
else if (f instanceof TreeBin) { //表示已经转化成红黑树类型了
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, //调用putTreeVal方法,将该元素添加到树中去
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD) //当在同一个节点的数目达到8个的时候,则扩张数组或将给节点的数据转为tree
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount); //计数
return null;
}
TreeBin
static final class TreeBin<K,V> extends Node<K,V> {
//指向TreeNode列表和根节点
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// 读写锁状态
static final int WRITER = 1; // 获取写锁的状态
static final int WAITER = 2; // 等待写锁的状态
static final int READER = 4; // 增加数据时读锁的状态
/**
* 初始化红黑树
*/
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
this.first = b;
TreeNode<K,V> r = null;
for (TreeNode<K,V> x = b, next; x != null; x = next) {
next = (TreeNode<K,V>)x.next;
x.left = x.right = null;
if (r == null) {
x.parent = null;
x.red = false;
r = x;
}
else {
K k = x.key;
int h = x.hash;
Class<?> kc = null;
for (TreeNode<K,V> p = r;;) {
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x);
break;
}
}
}
}
this.root = r;
assert checkInvariants(root);
}
......
}
1.根据 key 计算出 hashcode 。
2.判断是否需要进行初始化。
3.即为当前 key 定位出的 Node,如果为空表示当前位置可以写入数据,利用 CAS 尝试写入,失败则自旋保证成功。
4.如果当前位置的 hashcode == MOVED == -1,则需要进行扩容, 如果检测到某个节点的hash值是MOVED,则表示正在进行数组扩张的数据复制阶段, 则当前线程也会参与去复制,通过允许多线程复制的功能,一次来减少数组的复制所带来的性能损失。
5.如果都不满足,则利用 synchronized 锁写入数据。
6.如果数量大于 TREEIFY_THRESHOLD 则要转换为红黑树。
jdk1.8升级之后反而多了synchronized?
synchronized之前一直都是重量级的锁,但是后来java官方是对他进行过升级的,他现在采用的是锁升级的方式去做的
针对 synchronized 获取锁的方式,JVM 使用了锁升级的优化方式
先使用
偏向锁
优先同一线程然后再次获取锁,如果失败,就升级为 CAS 轻量级锁
,如果失败就会短暂自旋,防止线程被系统挂起。最后如果以上都失败就升级为重量级锁
ConcurrentHashMap get
- 根据计算出来的 hashcode 寻址,如果就在桶上那么直接返回值。
- 如果是红黑树那就按照树的方式获取值。
- 就不满足那就按照链表的方式遍历获取值。
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
//1.根据计算出来的 hashcode 寻址,如果就在桶上那么直接返回值。
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//是红黑树那就按照树的方式获取值, 链表的hash > 0, hash < 0则表示是红黑树
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//就不满足那就按照链表的方式遍历获取值。
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
ConcurrentHashMap的同步机制
读操作
在get操作中,没有使用同步机制,也没有使用unsafe方法,所以读操作是支持并发操作的
1.桶内只是链表,直接遍历链表读了。无任何锁性质的东西。
2.桶内有红黑树,在TreeBin的find方法中操作,是读读同时进行的时候,用红黑树查找。这里用CAS设置LockState字段,不要去理解成成读读互斥了,并不是一个线程读完了才能让另一个线程读,是只有把lockState字段增加这个操作本身互斥而已。
举个例子:
两个线程同时读,A线程用CAS设置了LockState字段为读后,A开始真的做读操作。B线程并不需要等A读完才能读,B线程只需要等A设置完LockState字段后,自己就能去设置LockState字段了然后开始读了。
final Node<K,V> find(int h, Object k) {
// key不为空开始查找,否则直接返回null
if (k != null) {
// 遍历链表
for (Node<K,V> e = first; e != null; ) {
int s; K ek;
//当锁状态为等待或写的状态时,以链表的方式的方式查找结点
//当为写状态的时候,读则采取遍历链表的方式,这样虽然查找复杂度提高,但读写不阻塞
//当为等待状态的时候,不继续加写锁,能让被阻塞的写线程尽快恢复运行,或者刚好让某个写线程不被阻塞
if (((s = lockState) & (WAITER|WRITER)) != 0) {
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
e = e.next;
}
//否则是读读操作,设置TreeBin的LOCKSTATE读锁状态增加
else if (U.compareAndSwapInt(this, LOCKSTATE, s,
s + READER)) {
TreeNode<K,V> r, p;
try {
// 利用红黑树来查找,速度很快
p = ((r = root) == null ? null :
r.findTreeNode(h, k, null));
} finally {
Thread w;
//如果是最后一个读线程,并且有写线程因为读锁而阻塞,要告诉写线程可以尝试获取写锁了
if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
(READER|WAITER) && (w = waiter) != null)
LockSupport.unpark(w);
}
return p;
}
}
}
return null;
}
做了2件事:
1.key不为空开始查找,否则直接返回null
2.遍历链表:
2.1 读写并行,当锁状态为等待或写的状态时,以链表的方式的方式查找结点
2.1.1. 当为写状态的时候,读则采取遍历链表的方式,这样虽然查找复杂度提高,但读写不阻塞
2.1.2. 当为等待状态的时候,不继续加写锁,让被阻塞的写线程尽快恢复运行,或者刚好让某个写线程不被阻塞
2.2 否则就是读读并行,用CAS设置TreeBin的lockState字段增加,用红黑树查找,这里用CAS设置LockState字段,不要去理解成成读读互斥了,并不是一个线程读完了才能让另一个线程读,是只有把lockState字段增加这个操作本身互斥而已。如果是最后一个读线程,并且有写线程因为读锁而阻塞,要告诉写线程可以尝试获取写锁了。
多个线程又是如何同步处理的
在ConcurrentHashMap中,同步处理主要是通过Synchronized和unsafe两种方式来完成的。
1.读读不互斥
2.读写不互斥
3.写写互斥
引起数组扩容的情况
- 只有在往map中添加元素的时候,在某一个节点的数目已经超过了8个,同时数组的长度又小于64的时候,才会触发数组的扩容。
- 当数组中元素达到了sizeCtl的数量的时候,则会调用transfer方法来进行扩容
扩容的时候,可以不可以对数组进行读写操作
事实上是可以的。当在进行数组扩容的时候,如果当前节点还没有被处理(也就是说还没有设置为fwd节点),那就可以进行设置操作。
如果该节点已经被处理了,则当前线程也会加入到扩容的操作中去。
https://blog.csdn.net/weixin_43185598/article/details/87938882