juc系列-ConcurrentHashMap

1 概述

ConcurrentHashMap和HashMap一样都是基于散列的容器,ConcurrentHashMap可以认为是一种线程安全HashMap,它使用了一中完全不同的加锁策略提高并发性和伸缩性。
ConcurrentHashMap并不是将每个方法在同一个锁上同步并使得每次只能有一个线程访问容器,而是使用一种粒度更细的加锁机制来实现更大程度的共享,这种机制称为“分段锁”。

2 静态结构

2.1 ConcurrentHashMap主要构件:

ConcurrentHashMap中主要实体类就是三个:

  • ConcurrentHashMap(整个Hash表)
  • Segment(桶)
  • HashEntry(节点)

对应下面的图可以看出之间的关系,静态内部类HashEntry用来封装映射表的键值对,Segment充当锁的角色,每个 Segment 对象维护散列表的若干个桶。每个桶是由若干个 HashEntry 对象链接起来的链表。一个 ConcurrentHashMap 实例中包含由若干个 Segment 对象组成的数组。


concurrenthashmap.png

下面是ConcurrentHashMap类的声明:

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
        implements ConcurrentMap<K, V>, Serializable

其中实现了ConcurrentMap接口,定义如下:

public interface ConcurrentMap<K, V> extendsMap<K, V> {
 
    V putIfAbsent(K key, V value);
 
    boolean remove(Object key, Object value);
 
    boolean replace(K key, V oldValue, V newValue);
 
    V replace(K key, V value);
}

其中定义了4个常用的复合操作:

V putIfAbsent(K key, V value);

如果没有这个key,则放入这个key-value,返回null,否则返回key对应的value。

boolean remove(Object key, Object value);

移除key和对应的value,如果key对应的不是value,移除失败

boolean replace(K key, V oldValue, V newValue);

替代key对应的值,仅当当前值为旧值

V replace(K key, V value);

替代key对应的值,只要当前有值

2.2 Segment 类

static final class Segment<K,V> extends ReentrantLock implements Serializable {
    
        private static final long serialVersionUID = 2249069246763182397L;

        static final int MAX_SCAN_RETRIES =
            Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

        transient volatile HashEntry<K,V>[] table;

        transient int count;  //本segment中HashEntry的个数

        transient int modCount; //table被更新的次数

        transient int threshold;  //当table中的HashEntry个数超过该值后,需要table再散列

        final float loadFactor; //装载因子

segment结构示意图:


segment.jpg

Segment是一个可重入锁,每个Segment就相当于一个HashMap,是一个数组和链表结构,维持了一个HashEntry数组。

2.3 HashEntry

Map内的键值对是用hashEntry类封装的,其中hash,key,next域都被声明为final属性,value被声明为volatile类型。下面是HashEntry源码:

static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;    
        。。。
}

3 动态算法

3.1 init-初始化流程

ConcurrentHashMap初始化:

public ConcurrentHashMap(int initialCapacity,
                        float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1; //ssize为segment个数(默认16)
        while (ssize < concurrencyLevel) {//找到最佳匹配参数(不小于concurrencyLevel的最小2次幂)
            ++sshift;
            ssize <<= 1;
        }
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;    //总容量/segment个数 = 每个segment容量
        if (c * ssize < initialCapacity)    //确保segment个数*segment容量大于总的容量
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY; //找到不小于c的最小2次幂,cap最终为每个segment的容量
        while (cap < c) 
            cap <<= 1;
        // create segments and segments[0]
//这里区别于JDK1.6区别的地方,只创建出segments[0],并且引入了UNSAFE类的putOrderedObject()方法,这个方法特点是低延时,速度快。而1.6中循环segments数组,为每个元素创建segment对象。
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

为什么segments数组大小必须是2的n次方?

为了能通过按位与操作来定位segments数组的索引值。

segmentShift和segmentMask干什么用滴?

我们知道基于散列的容器是通过元素的hashCode值来确定元素在容器中的索引的,那么ConcurrentHashMap中定位一个元素至少需要两步:

  • 定位segment
  • 定位HashEntry

segmentShift和segmentMask的作用就是用来定位segment元素的,如下:

//若ssize=16,即segments数组大小为16,那么sshift=4
this.segmentShift = 32 - sshift;//segmentShift=32-4=28
this.segmentMask = ssize - 1;//掩码segmentMask=15,对应二进制1111,每位上都是1

((hash >>> segmentShift) & segmentMask)//定位segmetns
(hash & (tab.length-1))//定位HashEntry

3.2 service-常用操作分析

3.2.1.get(Object key)

ConcurrentHashMap的get方法主要步骤如下:

  1. 通过key的hash值,确定所属的segment的索引。
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;

默认情况下segmentShift=28,segmentMask=15,u是hash后的值,为32位,h >>> segmentShift) & segmentMask)即让hash值得高4位参与到运算。

  1. 通过UNSAFE类的getObjectVolatile方法获取segment。
s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u))
  1. 通过hash值确定HashEntry数组位置,遍历链表找出entry
        for(HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if((k = e.key) == key || (e.hash == h && key.equals(k)))
                returne.value;
        }

下面是get方法整体源码:

publicV get(Object key) {
    Segment<K,V> s;
    HashEntry<K,V>[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    if((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null&&
        (tab = s.table) != null) {
        for(HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if((k = e.key) == key || (e.hash == h && key.equals(k)))
                returne.value;
        }
    }
    return null;
}

性能分析:

可以看到整个get方法执行过程中并没有进行同步操作,get操作无需加锁时间复杂度O(1)

3.2.2.put(K key, V value)

put操作步骤:

//ConcurrentHashMap的put源码
    public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)//value不允许为null
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;//定位segment元素
        if ((s = (Segment<K,V>)UNSAFE.getObject 
             (segments, (j << SSHIFT) + SBASE)) == null)
            s = ensureSegment(j);//第一次访问segment时,创建segment对象
        return s.put(key, hash, value, false);//调用segment的put方法
    }
    
    //下面是segment中的put方法:
        final V put(K key, int hash, V value, boolean onlyIfAbsent) {
        //tryLock()加锁,不成功则执行scanAndLockForPut
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);
                //下面for循环是进行元素定位即更新操作
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
                        //key已存在,通过onlyIfAbsent决定是否更新值
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    else {
                    //key不存在,构造node节点,通过UNSAFE类插入节点,注意:这里插入是头插法
                        if (node != null)
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);//插入链表头node.next = first;
                        int c = count + 1;
                        //是否需要扩容,注意:只对当前segment扩容
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }
需要注意的点:
  • 插入新节点的时候,是头插法。
  • 如果count > threshold需要扩容,那么只对当前segment扩容。
  • 若元素已存在,不一定更新,只有当onlyIfAbsent=false的时候才更新。

在put方法中,首先进行加锁操作,tryLock失败后并不会按照常规思路阻塞当前线程,而是执行scanAndLockForPut方法,下面重点分析下这个方法做了什么工作?

//Segment类中的常量MAX_SCAN_RETRIES:最大扫描次数,在scanAndLockForPut方法中会用到
        static final int MAX_SCAN_RETRIES =
            Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
            
    //scanAndLockForPut的源码:
    private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
            //定位HashEntry数组位置,获取第一个节点
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            HashEntry<K,V> node = null;
            int retries = -1;//扫描次数
            
            while (!tryLock()) {
                HashEntry<K,V> f;
                if (retries < 0) {
                    if (e == null) {
                        if (node == null) // 构造新节点
                            node = new HashEntry<K,V>(hash, key, value, null);
                        retries = 0;
                    }
                    else if (key.equals(e.key))
                        retries = 0;
                    else
                        e = e.next;
                }
                else if (++retries > MAX_SCAN_RETRIES) {
                    lock();
                    break;
                }
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f; // 首节点有变动,更新first,重新扫描
                    retries = -1;
                }
            }
            return node;
        }

scanAndLockForPut的作用:

  • 当前线程获取不到锁的时候并没有闲着,而是查找key是否已经存在,如果当前链表中没有查到,则新建一个HashEntry对象。
  • 新建HashEntry节点后,当retries <= MAX_SCAN_RETRIES时,不断通过tryLock尝试获取锁,retries > MAX_SCAN_RETRIES,则调用lock(),此时若还获取不到锁,那么当前线程就被阻塞,这点类似于自旋锁。
  • 在检索key的时候,别的线程可能正在对segment进行修改,所以要做如下检查:
//每间隔一次循环,检查一次first节点是否改变
if ((retries & 1) == 0 &&(f = entryForHash(this, hash)) != first) {
        e = first = f; // 首节点有变动,更新first,重新扫描
        retries = -1;
}

通过scanAndLockForPut方法,当前线程就可以在即使获取不到segment锁的情况下,完成潜在需要添加节点的实例化工作,当获取锁后,就可以直接将该节点插入链表即可。还实现了类似于自旋锁的功能,防止执行put操作的线程频繁阻塞,这些优化都提升了put操作的性能。

3.2.3.全局操作 size()/isEmpty()/clear()

由于分段技术的缘故,ConcurrentHashMap的全局操作(size/isEmpty/clear)就会变得复杂

size操作

要统计ConcurrentHashMap的元素个数,就要将所有segment的元素个数求和。直接累加?肯定不行,在并发环境中,这样得到的结果并不准确。对所有segment加锁再求和?这样做结果肯定正确,但是这违背了ConcurrentHashMap设计的初衷,在并发环境中要有良好的变现。ConcurrentHashMap中size方法实现选择了一个折中方案:

先尝试两次不对Segment加锁方式来统计count值,如果统计过程中count发生变化了,再加锁。如果没有改变,则返回size。

isEmpty操作

isEmpty操作会对segment对象的count进行检查,如果count!=0,那么直接返回false,同时对所有segment的modCount变量进行统计,统计两次的结果如果一致,那么表示容器在此之间没有发生变化,那么返回true.

clear操作

clear操作将容器中的所有元素移除,通过遍历segments数组,将每个segment对象中元素清除,源码如下:

    public void clear() {
        final Segment<K,V>[] segments = this.segments;
        for (int j = 0; j < segments.length; ++j) {
            Segment<K,V> s = segmentAt(segments, j);
            if (s != null)
                s.clear();
        }
    }

调用clear方法后,容器一定被清空了吗?

考虑这么一种情况,当A线程执行clear方法时,已经将segments[0]对象清空了,此时B线程执行put(key,value)方法,如果key散列到segments[0]上,那么A执行完后容器中还有元素!所以clear方法是弱一致的

ConcurrentHashMap弱一致性问题?

上面分析clear方法的若一致性,其实size()/isEmpty()/clear()都是在无锁清空下执行的,这带来的后果就是数据的不一致。所以在选择容器的时候要注意,如果非常强调全局方法的实时性和一致性,那么ConcurrentHashMap并不是一个好选择。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容