并发编程之并发容器

并发容器

HashMap的并发版本ConcurrentHashMap
ConcurrentSkipListMap 和 ConcurrentSkipListSet
LinkedList的并发版本ConcurrentLinkedQueue

写时复制容器

CopyOnWriteArrayList
CopyOnWriteArraySet

阻塞队列

常用阻塞队列
·ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
·LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
·PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
·DelayQueue:一个使用优先级队列实现的无界阻塞队列。
·SynchronousQueue:一个不存储元素的阻塞队列。
·LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
·LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

看ConcurrentHashMap之前先顺便说说HashMap
HashMap又分jdk1.7和1.7之后版本
1.7的hashMap结构是数组+链表,这里找几个比较有意思的地方分析下
1.7的hashMap在并发下的三个bug数据丢失,数据重复,死循环

hash冲突

hash冲突解决方法:1,开放寻址(开放寻址法的核心是如果出现了散列冲突,就重新探测一个空闲位置,将其插入。当我们往散列表中插入数据时,如果某个数据经过散列函数散列之后,存储位置已经被占用了,我们就从当前位置开始,依次往后查找,看是否有空闲位置,直到找到为止。);
2,再散列,3链地址法(HashMap就是)
这里HashMap的构造方法里默认的数组Entry<K,V>[] table长度是16(不会new,在put的时候才会去new,这个是导致第一个bug的原因),
第一个bug数据丢失,因为在put的时候才会new Entry<K,V>[],并发情况下就可能new多个..然后put到哪去了呢...

这里再说说第二个bug
多线程下put同一个元素,假设put的时候都前面的条件都通过了,我们看看createEntry()方法

void createEntry(int hash, K key, V value, int bucketIndex) {
    Entry<K,V> e = table[bucketIndex];
    table[bucketIndex] = new Entry<>(hash, key, value, e);
    size++;
}

往这个table[bucketIndex]赋值的时候线程1先赋值了table[i] = tooko,线程2进来先拿到e=线程1tooko,新建一个Entry他的next又是线程1tooko,就变成了了table[i]=线程2tooko, 线程2tooko-->线程1tooko
第二个bug数据重复

再看看扩容,扩容的条件是(size >= threshold) && (null != table[bucketIndex]) threshold是在put和扩容的时候去算的为容量加载因子,容量是16加载因子0.75的话就是12;
也就是说默认情况下,第一次扩容是元素的数量大于等于12并且,这个下标没有元素的时候进行扩容,所以即便是map里面已经有七八十个元素了,也可能不回去扩容,极限情况是刚好第十二个元素put的时候去扩容
这个newTable的length是table的length
2

void transfer(Entry[] newTable, boolean rehash) {
    int newCapacity = newTable.length;
    for (Entry<K,V> e : table) {
        while(null != e) {
            Entry<K,V> next = e.next;①
            if (rehash) {
                e.hash = null == e.key ? 0 : hash(e.key);
            }
            int i = indexFor(e.hash, newCapacity);
            e.next = newTable[i];②
            newTable[i] = e;
            e = next;
        }
    }
}

看看这两步
e.next = newTable[i];
newTable[i] = e;
这两步操作链表的我们用table里的nana-->konoha来分析下,假设旧table里table[3]上是nana-->konoha
这里第一次循环拿到nana后nana-->null,newTable[3]=nana;
第二次循环拿到konoha后,konoha-->nana,newTable[3]=konoha
所以在新的newTable里我们的nana-->konoha变成了konoha-->nana,就是说newTable[i]会把table[i]里的链表翻转

这里如果是在多线程情况下线程1刚好进到while循环里的①步骤拿到了e=nana,线程2把nana-->konoha变成了konoha-->nana;完了,nana-->konoha-->nana,线程1在while里出不去了
这就是HashMap的并发下的bug之一死循环

我们看看里面其他有意思的东西
table长度默认是16即便是有参构造去赋值也会变成大于等于这个值的2的次方,比如有参构造传的30,那么会通过inflateTable(30)变成32,
inflateTable(16)里面有个特别流弊的位运算取n的大于等于2的次方数
Integer.highestOneBit((number - 1) << 1);

public static int highestOneBit(int i) {
    // HD, Figure 3-1
    i |= (i >>  1);
    i |= (i >>  2);
    i |= (i >>  4);
    i |= (i >>  8);
    i |= (i >> 16);
    return i - (i >>> 1);
}

这个算法我收藏下,嘿嘿嘿~(这个算法在1.7的ConcurrentHashMap在通过并发级别算Segment[]的长度的时候用了另一种实现,当然只是用位运算就实现了更厉害些)

int ssize = 1;
while (ssize < number) {
    ssize <<= 1;
}

为什么一定要是2的次方呢,因为取模的操作 a % (2^n) 等价于 a & (2^n-1),位运算效率高于模运算,所以这里选择2的次方作为table的length.
比如说一个Entry的key为"konoha",hash后的值是19,table的length是16, 19 & (16-1) = 3,相当于19%16=3;
那么key为"konoha"的Entry就放在table[3]下另一个Entry的key为"miu"hash后是20,算出的索引就是4就放在table[4]下,
如果此时一个Entry的key为"nana"hash后是35,算出的index也是3,这个时候就用到链表了,table[3]下是key为"nana"的Entry,key为"nana"的Entry的next就是key为"konoha"的Entry(注意这里是头插法,后来的挂头上,旧的挂新的next上,所以就是table[3]=nana,nana-->konoha)

ok,hashMap就简单说说三个并发bug,还有为什么数组的长度要是2的n次方

我们在看看1.8的HashMap
首先在结构上发生了变化,由数组+链表变成了数组+链表+红黑树
如果链表长度到达阀值(默认是8,注释上说根据泊松分布8的时候桶里的数据已经很少了),就会将链表转换成红黑树
我们看看put方法

final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
               boolean evict) {
    Node<K,V>[] tab; Node<K,V> p; int n, i;
    if ((tab = table) == null || (n = tab.length) == 0)
        n = (tab = resize()).length;
    if ((p = tab[i = (n - 1) & hash]) == null)
        tab[i] = newNode(hash, key, value, null);
    else {
        Node<K,V> e; K k;
        if (p.hash == hash &&
            ((k = p.key) == key || (key != null && key.equals(k))))
            e = p;
        else if (p instanceof TreeNode)
            e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
        else {
            for (int binCount = 0; ; ++binCount) {
                if ((e = p.next) == null) {
                    p.next = newNode(hash, key, value, null);
                    if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
                        treeifyBin(tab, hash);
                    break;
                }
                if (e.hash == hash &&
                    ((k = e.key) == key || (key != null && key.equals(k))))
                    break;
                p = e;
            }
        }
        if (e != null) { // existing mapping for key
            V oldValue = e.value;
            if (!onlyIfAbsent || oldValue == null)
                e.value = value;
            afterNodeAccess(e);
            return oldValue;
        }
    }
    ++modCount;
    if (++size > threshold)
        resize();
    afterNodeInsertion(evict);
    return null;
}

当遍历的时候发现table[i]的元素大于等于7(TREEIFY_THRESHOLD - 1)的时候开始使用红黑树treeifyBin(tab, hash),扩容的时候发现newTable[i]上的元素小于6(hc <= UNTREEIFY_THRESHOLD)的时候从红黑树转回链表;
普通的二叉树会写,红黑树太麻烦了后面再去研究
我们发现1.8里是先添加进去之后再去检测扩容,1.7里是扩容后再去添加
还有1.8里扩容时的链表移到newTable的时候不会翻转了


ConcruuentHashMap
1.7中Segment[i]= new HashEntry[],HashEntry[i]-->链表,即数组[数组[]]+链表
就是说和Segment[]数组中有每个下标下有一个或者多个HashEntry[]数组,然后HashEntry的对象上面挂链表

public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    while (cap < c)
        cap <<= 1;
    // create segments and segments[0]
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

在ConcurrentHashMap的构造函数里面 会通过并发级别计算出Segment[]的长度,比如说默认并发级别是16 DEFAULT_CONCURRENCY_LEVEL = 16;那么Segment[]就是大于16的2的n次幂,刚好是16.然后用容量initialCapacity一除,在取最大2的n次幂,算出每个Segment里面的HashEntry[cap]的长度

通过上面的构造方法可以发现和hashMap不同,ConcurrentHashMap在初始化的时候就把Segment[]new出来了,并且还用CAS操作(UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0])给Segment[0]也赋值一个HashEntry[cap];
我们再看看put方法
先根据key的hash值在Segment数组中找到相应的位置i如果Segment[i]里面还未初始化,则通过CAS进行赋值,有就直接取到Segment,接着执行Segment对象的put方法,这个Segment是继承了ReentrantLock的,关于ReentrantLock我们知道的,在上一篇AQS里我们就分析过
他这里没有直接用lock(),Segment的put方法进来后先trylock(),拿到锁了就继续执行,没有拿到就scanAndLockForPut()

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1; // negative while locating node
    while (!tryLock()) {
        HashEntry<K,V> f; // to recheck first below
        if (retries < 0) {
            if (e == null) {
                if (node == null) // speculatively create node
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0;
            }
            else if (key.equals(e.key))
                retries = 0;
            else
                e = e.next;
        }
        else if (++retries > MAX_SCAN_RETRIES) {
            lock();
            break;
        }
        else if ((retries & 1) == 0 &&
                 (f = entryForHash(this, hash)) != first) {
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}

在scanAndLockForPut方法中,while循环执行tryLock()方法尝试获取锁,在多处理器环境下,重复次数为64,单处理器重复次数为1,当执行tryLock()方法的次数超过上限时,则执行lock()方法挂起线程;
当上一个线程执行完插入操作时,会通过finally{}的unlock()方法释放锁,释放时唤醒等待线程继续执行 ;
和HashMap不一样,ConcurrentMap的size()不是直接取的size,而是去统计的

public int size() {
    // Try a few times to get accurate count. On failure due to
    // continuous async changes in table, resort to locking.
    final Segment<K,V>[] segments = this.segments;
    int size;
    boolean overflow; // true if size overflows 32 bits
    long sum;         // sum of modCounts
    long last = 0L;   // previous sum
    int retries = -1; // first iteration isn't retry
    try {
        for (;;) {
            if (retries++ == RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    ensureSegment(j).lock(); // force creation
            }
            sum = 0L;
            size = 0;
            overflow = false;
            for (int j = 0; j < segments.length; ++j) {
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) {
                    sum += seg.modCount;
                    int c = seg.count;
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }
            if (sum == last)
                break;
            last = sum;
        }
    } finally {
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    return overflow ? Integer.MAX_VALUE : size;
}

先采用不加锁的方式,连续计算元素的个数,如果前后两次计算结果相同,则说明计算出来的元素个数是准确的,直接返回;
如果前后两次计算结果都不同,则给每个Segment进行加锁,再计算一次元素的个数;

看看1.8的ConcurrentHashMap,去掉了Segment[];结构和1.8的HashMap一样,数组+链表+红黑树
也是在put的时候去new Node[],但是他用了CAS去保证多线程下不会new多个Node[]

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        if ((sc = sizeCtl) < 0)//如果容量小于0,说明有线程进入了下面的else if,然后该线程yield()让出执行权
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//CAS把容量改为-1;
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];//new出Node[]
                    table = tab = nt;
                    sc = n - (n >>> 2);//把sizeCtl置为容量的3/4(0.75f)应该是要当做扩容时的阈值用
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

put的时候
如果相应位置的Node还未初始化,则通过CAS插入相应的数据;
如果相应位置的Node不为空,且当前该节点不处于移动状态,则对该节点加synchronized锁,如果该节点的hash不小于0,则遍历链表更新节点或插入新节点
如果该节点是TreeBin类型的节点,说明是红黑树结构,则通过putTreeVal方法往红黑树中插入节点
如果binCount不为0,说明put操作对数据产生了影响,如果当前链表的个数达到8个,则通过treeifyBin方法转化为红黑树,如果oldVal不为空,说明是一次更新操作,没有对元素个数产生影响,则直接返回旧值;

size();
1.8的size有记录baseCount,put和remove的时候都会通过CAS去更新,除此之外还有CounterCell[],这个放没来得及更新baseCount的记录遍历出来累加

关于ConcurrentSkipListMap 和 ConcurrentSkipListSet,这两个和HashMap,HashSet差不多,HashSet就是通过HashMap实现的,只是所有的Value都是同一个object
ConcurrentSkipListMap 就是跳表,类似二叉树,put的时候会构建多级索引

https://juejin.im/post/5cf8c50ff265da1b76389223

Queue:https://juejin.im/post/5dee37ad51882512657ba41f

总结:
HashMap本来就是线程不安全的,并发条件下建议使用ConcurrentHashMap,注意下1.7死循环的原因,1.7和1.8的结构不同.
ConcruuentHashMap注意1.7和1.8的结构:1.7多了Sement[],锁的是Sement,没有红黑树;

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,240评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,328评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,182评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,121评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,135评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,093评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,013评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,854评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,295评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,513评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,398评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,989评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,636评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,657评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352

推荐阅读更多精彩内容