并发容器
HashMap的并发版本ConcurrentHashMap
ConcurrentSkipListMap 和 ConcurrentSkipListSet
LinkedList的并发版本ConcurrentLinkedQueue
写时复制容器
CopyOnWriteArrayList
CopyOnWriteArraySet
阻塞队列
常用阻塞队列
·ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
·LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
·PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
·DelayQueue:一个使用优先级队列实现的无界阻塞队列。
·SynchronousQueue:一个不存储元素的阻塞队列。
·LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
·LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
看ConcurrentHashMap之前先顺便说说HashMap
HashMap又分jdk1.7和1.7之后版本
1.7的hashMap结构是数组+链表,这里找几个比较有意思的地方分析下
1.7的hashMap在并发下的三个bug数据丢失,数据重复,死循环
hash冲突
hash冲突解决方法:1,开放寻址(开放寻址法的核心是如果出现了散列冲突,就重新探测一个空闲位置,将其插入。当我们往散列表中插入数据时,如果某个数据经过散列函数散列之后,存储位置已经被占用了,我们就从当前位置开始,依次往后查找,看是否有空闲位置,直到找到为止。);
2,再散列,3链地址法(HashMap就是)
这里HashMap的构造方法里默认的数组Entry<K,V>[] table长度是16(不会new,在put的时候才会去new,这个是导致第一个bug的原因),
第一个bug数据丢失,因为在put的时候才会new Entry<K,V>[],并发情况下就可能new多个..然后put到哪去了呢...
这里再说说第二个bug
多线程下put同一个元素,假设put的时候都前面的条件都通过了,我们看看createEntry()方法
void createEntry(int hash, K key, V value, int bucketIndex) {
Entry<K,V> e = table[bucketIndex];
table[bucketIndex] = new Entry<>(hash, key, value, e);
size++;
}
往这个table[bucketIndex]赋值的时候线程1先赋值了table[i] = tooko,线程2进来先拿到e=线程1tooko,新建一个Entry他的next又是线程1tooko,就变成了了table[i]=线程2tooko, 线程2tooko-->线程1tooko
第二个bug数据重复
再看看扩容,扩容的条件是(size >= threshold) && (null != table[bucketIndex]) threshold是在put和扩容的时候去算的为容量加载因子,容量是16加载因子0.75的话就是12;
也就是说默认情况下,第一次扩容是元素的数量大于等于12并且,这个下标没有元素的时候进行扩容,所以即便是map里面已经有七八十个元素了,也可能不回去扩容,极限情况是刚好第十二个元素put的时候去扩容
这个newTable的length是table的length2
void transfer(Entry[] newTable, boolean rehash) {
int newCapacity = newTable.length;
for (Entry<K,V> e : table) {
while(null != e) {
Entry<K,V> next = e.next;①
if (rehash) {
e.hash = null == e.key ? 0 : hash(e.key);
}
int i = indexFor(e.hash, newCapacity);
e.next = newTable[i];②
newTable[i] = e;
e = next;
}
}
}
看看这两步
e.next = newTable[i];
newTable[i] = e;
这两步操作链表的我们用table里的nana-->konoha来分析下,假设旧table里table[3]上是nana-->konoha
这里第一次循环拿到nana后nana-->null,newTable[3]=nana;
第二次循环拿到konoha后,konoha-->nana,newTable[3]=konoha
所以在新的newTable里我们的nana-->konoha变成了konoha-->nana,就是说newTable[i]会把table[i]里的链表翻转
这里如果是在多线程情况下线程1刚好进到while循环里的①步骤拿到了e=nana,线程2把nana-->konoha变成了konoha-->nana;完了,nana-->konoha-->nana,线程1在while里出不去了
这就是HashMap的并发下的bug之一死循环
我们看看里面其他有意思的东西
table长度默认是16即便是有参构造去赋值也会变成大于等于这个值的2的次方,比如有参构造传的30,那么会通过inflateTable(30)变成32,
inflateTable(16)里面有个特别流弊的位运算取n的大于等于2的次方数
Integer.highestOneBit((number - 1) << 1);
public static int highestOneBit(int i) {
// HD, Figure 3-1
i |= (i >> 1);
i |= (i >> 2);
i |= (i >> 4);
i |= (i >> 8);
i |= (i >> 16);
return i - (i >>> 1);
}
这个算法我收藏下,嘿嘿嘿~(这个算法在1.7的ConcurrentHashMap在通过并发级别算Segment[]的长度的时候用了另一种实现,当然只是用位运算就实现了更厉害些)
int ssize = 1;
while (ssize < number) {
ssize <<= 1;
}
为什么一定要是2的次方呢,因为取模的操作 a % (2^n) 等价于 a & (2^n-1),位运算效率高于模运算,所以这里选择2的次方作为table的length.
比如说一个Entry的key为"konoha",hash后的值是19,table的length是16, 19 & (16-1) = 3,相当于19%16=3;
那么key为"konoha"的Entry就放在table[3]下另一个Entry的key为"miu"hash后是20,算出的索引就是4就放在table[4]下,
如果此时一个Entry的key为"nana"hash后是35,算出的index也是3,这个时候就用到链表了,table[3]下是key为"nana"的Entry,key为"nana"的Entry的next就是key为"konoha"的Entry(注意这里是头插法,后来的挂头上,旧的挂新的next上,所以就是table[3]=nana,nana-->konoha)
ok,hashMap就简单说说三个并发bug,还有为什么数组的长度要是2的n次方
我们在看看1.8的HashMap
首先在结构上发生了变化,由数组+链表变成了数组+链表+红黑树
如果链表长度到达阀值(默认是8,注释上说根据泊松分布8的时候桶里的数据已经很少了),就会将链表转换成红黑树
我们看看put方法
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K,V>[] tab; Node<K,V> p; int n, i;
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);
else {
Node<K,V> e; K k;
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;
else if (p instanceof TreeNode)
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
else {
for (int binCount = 0; ; ++binCount) {
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null);
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
treeifyBin(tab, hash);
break;
}
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}
}
if (e != null) { // existing mapping for key
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
return oldValue;
}
}
++modCount;
if (++size > threshold)
resize();
afterNodeInsertion(evict);
return null;
}
当遍历的时候发现table[i]的元素大于等于7(TREEIFY_THRESHOLD - 1)的时候开始使用红黑树treeifyBin(tab, hash),扩容的时候发现newTable[i]上的元素小于6(hc <= UNTREEIFY_THRESHOLD)的时候从红黑树转回链表;
普通的二叉树会写,红黑树太麻烦了后面再去研究
我们发现1.8里是先添加进去之后再去检测扩容,1.7里是扩容后再去添加
还有1.8里扩容时的链表移到newTable的时候不会翻转了
ConcruuentHashMap
1.7中Segment[i]= new HashEntry[],HashEntry[i]-->链表,即数组[数组[]]+链表
就是说和Segment[]数组中有每个下标下有一个或者多个HashEntry[]数组,然后HashEntry的对象上面挂链表
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
在ConcurrentHashMap的构造函数里面 会通过并发级别计算出Segment[]的长度,比如说默认并发级别是16 DEFAULT_CONCURRENCY_LEVEL = 16;那么Segment[]就是大于16的2的n次幂,刚好是16.然后用容量initialCapacity一除,在取最大2的n次幂,算出每个Segment里面的HashEntry[cap]的长度
通过上面的构造方法可以发现和hashMap不同,ConcurrentHashMap在初始化的时候就把Segment[]new出来了,并且还用CAS操作(UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0])给Segment[0]也赋值一个HashEntry[cap];
我们再看看put方法
先根据key的hash值在Segment数组中找到相应的位置i如果Segment[i]里面还未初始化,则通过CAS进行赋值,有就直接取到Segment,接着执行Segment对象的put方法,这个Segment是继承了ReentrantLock的,关于ReentrantLock我们知道的,在上一篇AQS里我们就分析过
他这里没有直接用lock(),Segment的put方法进来后先trylock(),拿到锁了就继续执行,没有拿到就scanAndLockForPut()
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
在scanAndLockForPut方法中,while循环执行tryLock()方法尝试获取锁,在多处理器环境下,重复次数为64,单处理器重复次数为1,当执行tryLock()方法的次数超过上限时,则执行lock()方法挂起线程;
当上一个线程执行完插入操作时,会通过finally{}的unlock()方法释放锁,释放时唤醒等待线程继续执行 ;
和HashMap不一样,ConcurrentMap的size()不是直接取的size,而是去统计的
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
先采用不加锁的方式,连续计算元素的个数,如果前后两次计算结果相同,则说明计算出来的元素个数是准确的,直接返回;
如果前后两次计算结果都不同,则给每个Segment进行加锁,再计算一次元素的个数;
看看1.8的ConcurrentHashMap,去掉了Segment[];结构和1.8的HashMap一样,数组+链表+红黑树
也是在put的时候去new Node[],但是他用了CAS去保证多线程下不会new多个Node[]
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)//如果容量小于0,说明有线程进入了下面的else if,然后该线程yield()让出执行权
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//CAS把容量改为-1;
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];//new出Node[]
table = tab = nt;
sc = n - (n >>> 2);//把sizeCtl置为容量的3/4(0.75f)应该是要当做扩容时的阈值用
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
put的时候
如果相应位置的Node还未初始化,则通过CAS插入相应的数据;
如果相应位置的Node不为空,且当前该节点不处于移动状态,则对该节点加synchronized锁,如果该节点的hash不小于0,则遍历链表更新节点或插入新节点
如果该节点是TreeBin类型的节点,说明是红黑树结构,则通过putTreeVal方法往红黑树中插入节点
如果binCount不为0,说明put操作对数据产生了影响,如果当前链表的个数达到8个,则通过treeifyBin方法转化为红黑树,如果oldVal不为空,说明是一次更新操作,没有对元素个数产生影响,则直接返回旧值;
size();
1.8的size有记录baseCount,put和remove的时候都会通过CAS去更新,除此之外还有CounterCell[],这个放没来得及更新baseCount的记录遍历出来累加
关于ConcurrentSkipListMap 和 ConcurrentSkipListSet,这两个和HashMap,HashSet差不多,HashSet就是通过HashMap实现的,只是所有的Value都是同一个object
ConcurrentSkipListMap 就是跳表,类似二叉树,put的时候会构建多级索引
https://juejin.im/post/5cf8c50ff265da1b76389223
Queue:https://juejin.im/post/5dee37ad51882512657ba41f
总结:
HashMap本来就是线程不安全的,并发条件下建议使用ConcurrentHashMap,注意下1.7死循环的原因,1.7和1.8的结构不同.
ConcruuentHashMap注意1.7和1.8的结构:1.7多了Sement[],锁的是Sement,没有红黑树;