ConcurrentHashMap源码分析(JDK8) get/put/remove方法分析

与旧版本的主要区别

更小的锁粒度

jdk8中摒弃了segment锁,直接将hash桶的头结点当做锁。

旧版本的一个segment锁,保护了多个hash桶,而jdk8版本的一个锁只保护一个hash桶,由于锁的粒度变小了,写操作的并发性得到了极大的提升。

image.png

更高效的扩容

1 更多的扩容线程

扩容时,需要锁的保护。因此:旧版本最多可以同时扩容的线程数是segment锁的个数
而jdk8的版本,理论上最多可以同时扩容的线程数是:hash桶的个数(table数组的长度)。但是为了防止扩容线程过多,ConcurrentHashMap规定了扩容线程每次最少迁移16个hash桶,因此jdk8的版本实际上最多可以同时扩容的线程数是:hash桶的个数/16

2 扩容期间,依然保证较高的并发度

旧版本的segment锁,锁定范围太大,导致扩容期间,写并发度,严重下降。
而新版本的采用更加细粒度的hash桶级别锁,扩容期间,依然可以保证写操作的并发度。

image.png

由此可见,jdk8版本的ConcurrentHashMap,在扩容期间,依然可以保证非常高的并发度

扩容具体实现请参考:ConcurrentHashMap源码分析(JDK8) 扩容实现机制

ConcurrentHashMap的重要结构与方法

ConcurrentHashMap内部,和hashmap一样,维护了一个table数组,数组元素是Node链表或者红黑树.

image.png

Node

Key-value entry, 继承自Map.Entry<K,V>对象。


    static class Node<K,V> implements Map.Entry<K,V> {
        /**
          Node节点的hash值和key的hash值相同
          TreeNode节点的hash值
        **/
        final int hash;
        final K key;
        volatile V val;  //volatile确保了val的内存可见性
        volatile Node<K,V> next;//volatile确保了next的内存可见性
        ...
        ...
       
    }

TreeNode/TreeBin

Node的子类,红黑树节点,当Node链表过长时,会转换成红黑树。

    ...
    static final int TREEIFY_THRESHOLD = 8;
    ...
     //put方法中会判断:tab[i]的链表过长,转成红黑树或者扩容(tab.length过短,优先扩容)
    if (binCount >= TREEIFY_THRESHOLD)
        treeifyBin(tab, i);
    ...

ForwardingNode

Node的子类,扩容时的标记节点,此处不做分析,请参考ConcurrentHashMap源码分析(JDK8)扩容实现机制

table数组

    /**
     * The array of bins. Lazily initialized upon first insertion.
     * Size is always a power of two. Accessed directly by iterators.
     */
    transient volatile Node<K,V>[] table;

关于table数组,有3个重要方法


    //以volatile读的方式读取table数组中的元素
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
    //以volatile写的方式,将元素插入table数组
    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }
    //以CAS的方式,将元素插入table数组
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        //原子的执行如下逻辑:如果tab[i]==c,则设置tab[i]=v,并返回ture.否则返回false
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
    

我们思考一个问题:

旧版本对table数组元素的读写,都是在segment锁保护的情况下进行的,因此不会内存可见性问题。而jdk8的实现中,锁的粒度是hash桶,因此对table数组元素的读写,大部分都是在没有锁的保护下进行的,那么该如何保证table数组元素的内存可见性?

要解决这个问题,我们要了解如下2点:

  1. volatile的happens-before规则对一个volatile变量的写一定可见(happens-before)于随后对它的读
  1. CAS同时具有volatile读和volatile写的内存语义

然后,我们看下put方法,是如何利用上述规则,在没有锁的情况下,保证table数组元素的内存可见性:

image.png

如上图所示,在不加锁的情况下:线程A成功执行casTabAt操作后,随后的线程B可以通过tabAt方法立刻看到table[i]的改变。原因如下:线程A的casTabAt操作,具有volatile读写相同的内存语义,根据volatile的happens-before规则:线程A的casTabAt操作,一定对线程B的tabAt操作可见。

我们再思考如下问题:

ConcurrentHashMap中的锁是hash桶的头结点,那么当多个put线程访问头结点为空的hash桶时,在没有互斥锁保护的情况下,多个put线程都会尝试将元素插入头结点,此时如何确保并发安全呢?

image.png

要解决这个问题,我们要先了解什么是CAS操作:CAS,即compareAndSwap,原子的执行比较并交换的操作:当前内存值如果和预期值相等,则将其更新为目标值,否则不更新。

然后,我们看下ConcurrentHashMap的put方法是如何通过CAS确保线程安全的:

假设此时有2个put线程,都发现此时桶为空,线程一执行casTabAt(tab,i,null,node1),此时tab[i]等于预期值null,因此会插入node1。随后线程二执行casTabAt(tba,i,null,node2),此时tab[i]不等于预期值null,插入失败。然后线程二会回到for循环开始处,重新获取tab[i]作为预期值,重复上述逻辑。

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        ...
        for (Node<K,V>[] tab = table;;) {
            ...
            //key定位到的hash桶为空
            if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                //cas设置tab[i]的头结点。
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;   //设置成功,跳出for循环
                //设置失败,说明tab[i]已经被另一个线程修改了。回到for循环开始处,重新判断hash桶是否为空。如何往复,直到设置成功,或者hash桶不空。
            }else{
               synchronized (f) {
                   //
               }
                
            }
        }
        ...
    }

以上通过for循环+CAS操作,实现并发安全的方式就是无锁算法(lock free)的经典实现

java原子类的自增操作,也是通过for循环+CAS操作的方式实现的:

    //JDK7版本的 AtomicInteger 类的原子自增操作
    public final int getAndIncrement() {
        for (;;) {
            //获取value
            int current = get();
            int next = current + 1;
            //value值没有变,说明其他线程没有自增过,将value设置为next
            if (compareAndSet(current, next))
                return current;
            //否则说明value值已经改变,回到循环开始处,重新获取value。
        }
    }

get方法

get方法同样利用了volatile特性,实现了无锁读

查找value的过程如下:

  1. 根据key定位hash桶,通过tabAt的volatile读,获取hash桶的头结点。
  2. 通过头结点Node的volatile属性next,遍历Node链表
  3. 找到目标node后,读取Node的volatile属性val

可见上述3个操作都是volatile读,因此可以做到在不加锁的情况下,保证value的内存可见性

    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            //定位目标hash桶,通过tabAt方法valatile读,读取hash桶的头结点
            (e = tabAt(tab, (n - 1) & h)) != null) {
            //第一个节点就是要找的元素
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    //e.val也是valatile
                    return e.val;
            }
            //特殊节点(红黑树,已经迁移的节点(ForwardingNode)等
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            //遍历node链表(e.next也是valitle变量)
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

put方法

  1. 由于锁的粒度是hash桶,多个put线程只有在请求同一个hash桶时,才会被阻塞。请求不同hash桶的put线程,可以并发执行。

  2. put线程,请求的hash桶为空时,采用for循环+CAS的方式无锁插入。

image.png
     public V put(K key, V value) {
        return putVal(key, value, false);
    }

    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        //for循环+CAS操作
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            //hash桶(tab[i])为空
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                //CAS设置tab[i],不需要加锁
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            //hash桶(tab[i])是fwd节点,表示正在扩容
            else if ((fh = f.hash) == MOVED)
                //帮其扩容
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                //hash桶不为空,对tab[i]中的头结点加锁
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        //node链表
                        if (fh >= 0) {
                            binCount = 1;
                            
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                //key-value entry已经存在,更新value
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                
                                Node<K,V> pred = e;
                                //是尾节点,则插入
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        //红黑树
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    //tab[i]的链表过长,转成红黑树或者扩容(tab.length过短,优先扩容)
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        //size属性+1,如果size属性大于扩容阈值(sizeCtl)则扩容
        addCount(1L, binCount);
        return null;
    }

remove方法

和put方法一样,多个remove线程请求不同的hash桶时,可以并发执行

image.png

如图所示:删除的node节点的next依然指着下一个元素。此时若有一个遍历线程正在遍历这个已经删除的节点,这个遍历线程依然可以通过next属性访问下一个元素。从遍历线程的角度看,他并没有感知到此节点已经删除了,这说明了ConcurrentHashMap提供了弱一致性的迭代器。遍历操作可以参考ConcurrentHashMap源码分析(JDK8) 遍历操作分析

    public V remove(Object key) {
        return replaceNode(key, null, null);
    }

    /**
    
       参数value:当 value==null 时 ,删除节点 。否则 更新节点的值为value
       
       参数cv:一个期望值, 当 map[key].value 等于期望值cv  或者 cv==null的时候 ,删除节点,或者更新节点的值
    */
    final V replaceNode(Object key, V value, Object cv) {
        int hash = spread(key.hashCode());
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //table还没有初始化或者key对应的hash桶为空
            if (tab == null || (n = tab.length) == 0 ||
                (f = tabAt(tab, i = (n - 1) & hash)) == null)
                break;
            //正在扩容
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                boolean validated = false;
                synchronized (f) {
                    //cas获取tab[i],如果此时tab[i]!=f,说明其他线程修改了tab[i]。回到for循环开始处,重新执行
                    if (tabAt(tab, i) == f) {
                        //node链表
                        if (fh >= 0) {
                            validated = true;
                            for (Node<K,V> e = f, pred = null;;) {
                                K ek;
                                //找的key对应的node
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    V ev = e.val;
                                    //cv参数代表期望值
                                    //cv==null:表示直接更新value/删除节点
                                    //cv不为空,则只有在key的oldValue等于期望值的时候,才更新value/删除节点
                                    
                                    //符合更新value或者删除节点的条件
                                    if (cv == null || cv == ev ||
                                        (ev != null && cv.equals(ev))) {
                                        oldVal = ev;
                                        //更新value
                                        if (value != null)
                                            e.val = value;
                                        //删除非头节点
                                        else if (pred != null)
                                            pred.next = e.next;
                                        //删除头节点
                                        else
                                            //因为已经获取了头结点锁,所以此时不需要使用casTabAt
                                            setTabAt(tab, i, e.next);
                                    }
                                    break;
                                }
                                //当前节点不是目标节点,继续遍历下一个节点
                                pred = e;
                                if ((e = e.next) == null)
                                    //到达链表尾部,依旧没有找到,跳出循环
                                    break;
                            }
                        }
                        //红黑树
                        else if (f instanceof TreeBin) {
                            validated = true;
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> r, p;
                            if ((r = t.root) != null &&
                                (p = r.findTreeNode(hash, key, null)) != null) {
                                V pv = p.val;
                                if (cv == null || cv == pv ||
                                    (pv != null && cv.equals(pv))) {
                                    oldVal = pv;
                                    if (value != null)
                                        p.val = value;
                                    else if (t.removeTreeNode(p))
                                        setTabAt(tab, i, untreeify(t.first));
                                }
                            }
                        }
                    }
                }
                if (validated) {
                    if (oldVal != null) {
                        //如果删除了节点,更新size
                        if (value == null)
                            addCount(-1L, -1);
                        return oldVal;
                    }
                    break;
                }
            }
        }
        return null;
    }

参考资料

http://blog.csdn.net/u010723709/article/details/48007881
https://www.ibm.com/developerworks/cn/java/java-lo-concurrenthashmap/

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容