ConcurrentHashMap解析二(putVal方法的解析)

ConcurrentHashMap中put()这个方法很容易引起并发操作的问题,现在来研究下put()方法的实现

put方法

public V put(K key, V value) {
    return putVal(key, value, false);
}
  //onlyIfAbsent跟HashMap一样,就是判断是否要覆盖,默认为false,覆盖
final V putVal(K key, V value, boolean onlyIfAbsent) {
  //这句话可以看出,ConcurrentHashMap中不允许存在空值,这个是跟HashMap的区别之一
  //通过这个机制,我们可以通过get方法获取一个key,如果抛出异常,说明这个key不存在
    if (key == null || value == null) throw new NullPointerException();
    //ConcurrentHashMap中的hash值计算方法,跟HashMap中的差不多,不过最后的结果要
     //与0x7fffffff进行与操作,这个地方我还不明白为什么有这个与操作
    int hash = spread(key.hashCode());
    //binCount=0说明首节点插入,未进行链表或红黑树操作,因为后面会对这个值进行更改
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
    //如果数组为空或者长度为0,进行初始化工作
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
    //如果获取位置的节点为空,说明是首节点插入情况
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
      //如果hash值等于MOVEN(默认-1),说明是协助扩容,这个在后边讲解ForwardingNode类
      //进行进一步解析
        else if ((fh = f.hash) == MOVED)
        //协助扩容
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
          //对桶的首节点进行加锁
            synchronized (f) {
                //双重判定,为了防止在当前线程进来之前,i地址所对应对象已经更改
                if (tabAt(tab, i) == f) {
                //为什么fh一定要大于等于0,这个原因在于TreeBin的hash值的设定,TreeBin类型
                //的hash值默认设置为了-2
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                          //在当前桶中找到位置跳出
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            //当到桶的结尾还没找到,则新增一个Node
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    //如果hash小于0,判断是否是TreeBin的实例
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
        //如果binCount值不等于0,说明进行了链表或者红黑树操作
            if (binCount != 0) {
                //如果binCount大于8则进行树化,但真正的转换成红黑树不是8的长度
                //当长度超过64才会真正的树化,处于8-64之间的还只是数组扩容
                if (binCount >= TREEIFY_THRESHOLD)
                    //这个方法我在红黑树一节详细讲解了,不清楚的可以看红黑树那节
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
      //计数方法
    addCount(1L, binCount);
    return null;
}

我对这个方法进行了注释,可以直观的看代码进行了解,主要分析一下三个方法
1、数组初始化方法initTable()
2、线程协助扩容方法helpTransfer()
3、计数方法addCount()

数组初始化initTable()

我们来看下这个方法的代码:

    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
          //判断是否数组为空或者数组长度为0(未初始化)
        while ((tab = table) == null || tab.length == 0) {
              //如果sizeCtl值小于0,则每个进入到这里的线程要作线程让步操作
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            //SIZECTL是地址偏移量,和前边我们讲到的tabAt()那三个方法的地址偏移量一样
            //如果SIZECTL对应地址的值与sc相等,说明当前的线程是第一个到达这条语句的
             //线程,那么就会将SIZECTL地址所对应的值替换成-1,而SIZECTL地址偏移量对应的  
           // 对象就是sizeCtl。
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                      //再次进行判断,防止在进行U.compareAndSwapInt(this, SIZECTL, sc, -1)的时候
                      //有其他线程并发进入方法,导致出错,使用双重锁
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    //如果未初始化,会将sizeCtl设置成为当前容量类似hashmap中阈值threshold
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

这个地方有点疑问,sizeCtl到底指代什么,是阈值还是容量,在传入容量的构造方法中,是直接将cap赋值给sizeCtl,也就是说在这里的sizeCtl是容量大小,而数组初始化的时候,又将容量的0.75倍赋值给了sizeCtl,找了下网上的资料
引用这个链接(https://www.jianshu.com/p/c0642afe03e0)的说法:
sizeCtl :默认为0,用来控制table的初始化和扩容操作,具体应用在后续会体现出来。
**-1 **代表table正在初始化
**-N **表示有N-1个线程正在进行扩容操作
其余情况:
1、如果table未初始化,表示table需要初始化的大小。
2、如果table初始化完成,表示table的容量,默认是table大小的0.75倍,居然用这个公式算0.75(n - (n >>> 2))。
也就是说,sizeCtl的值有这几种情况,跟hashmap中单一指代阈值和容量的不同

线程协助扩容方法helpTransfer()

讲这个方法之前,我们需要先了解一些其他的类与方法,首先是ForwardingNode,一个特殊的Node节点,hash值为-1,其中存储nextTable的引用。

    static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            //MOVEN默认值-1
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }

        Node<K,V> find(int h, Object k){...}
    }

然后是方法resizeStamp(),这个方法是计算校验码的

    static final int resizeStamp(int n) {
        return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
    }

好,现在我们来看下helpTransfer()方法的使用:

    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            int rs = resizeStamp(tab.length);
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }   

首先判断是否能够协助扩容的条件
1、数组不为空
2、传入的首节点是ForwardingNode的实例
3、该实例的nextTable类型不为空
在这三个条件的前提下,进行下一步,通过resizeStamp()方法拿到一个校验码,然后再判断sizeCtl的值是否小于0,因为sizeCtl负数表示正在扩容。

停止扩容的四个条件(这部分我还没理解,等后面理解了回来补充):
1、(sc >>> RESIZE_STAMP_SHIFT) != rs
2、sc == rs + 1
3、 sc == rs + MAX_RESIZERS
4、transferIndex <= 0

如果符合扩容条件,就使用U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)在sc上加一,然后调用transfer()方法进行数据迁移,这个方法比较复杂,到时候拿出来单独讲.
参考链接:
https://blog.csdn.net/u011392897/article/details/60479937

计数方法addCount()

    private final void addCount(long x, int check) {
            CounterCell[] as; long b, s;
            //计数部分
            if ((as = counterCells) != null ||
                !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
                CounterCell a; long v; int m;
                boolean uncontended = true;
                if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                    !(uncontended =
                      U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                    fullAddCount(x, uncontended);
                    return;
                }
                if (check <= 1)
                    return;
                s = sumCount();
            }
          //扩容部分
            if (check >= 0) {
                Node<K,V>[] tab, nt; int n, sc;
                while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                       (n = tab.length) < MAXIMUM_CAPACITY) {
                    int rs = resizeStamp(n);
                    if (sc < 0) {
                        if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0)
                            break;
                        if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                            transfer(tab, nt);
                    }
                    else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                                 (rs << RESIZE_STAMP_SHIFT) + 2))
                        transfer(tab, null);
                    s = sumCount();
                }
            }
        }       

这个方法分为两部分,第一部分是计数,第二部分是扩容
第一部分计数,
当counterCells为空并且U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)返回true,则直接到下一部分是否扩容的判断,否则 CAS 失败后转入更新 counterCells ,防止CAS 自旋,使用的方法是fullAndCount(),这个方法取自striped64的LongerAdd的方法。
关于striped64的解析详情可以看以下链接:
http://brokendreams.iteye.com/blog/2259857
说到这个,我就不得不提一下这个类CounterCell,这个类代码如下:

    @sun.misc.Contended static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
    }

我们看到这个类用@sun.misc.Contended进行了注释,这个注释是自动填充缓存行用的,为什么要自动填充缓存行?
因为这样可以防止伪共享的情况,那么什么是伪共享呢?
我解释一下,就是当多线程修改互相独立的变量时,如果这些变量共享同一个缓存行,就会无意中影响彼此的性能,这就是伪共享,也就是避免头结点和为节点加载到同一个缓存行,使头尾节点在修改时不会互相锁定,基于此,我们需要自动填充缓存行。
这个类中我们可以看到只有一个volatile修饰符的变量value,也就是单个counterCell对象累加值

第二部分扩容,
满足当前容量大于sizeCtl值并且数组不为空,数组长度小于最大容量值的时候,就开始扩容,跟helpTransfer()方法中一样,这里也进行了同样的判定,当线程刚进来的时候,sc是正的,所以执行else if的语句,U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2)
这条语句将sizeCtl直接赋值成了一个负数,如果赋值成功,则调用数据迁移方法transfer().
在代码的最后有个s = sumCount()的语句,这个是ConcurrentHashmap内部计数完反馈给我们的值,在size()中也有相关调用:

    public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }

我们来看下sumCount()方法的内部实现:

    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

可以看出这个计算是分为两部分,基础部分是baseCount部分,然后还要加上countCells数组的所有值,通过之前的分析我们知道,baseCount是CAS成功后会自动累加的值,而countCells数组是在CAS失败,也就是出现并发的情况下,进行累加的数组,这个数组类似segmenet一样,采用分段锁,最后将二者的值相加从而得到一个近似值。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,458评论 6 513
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,030评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,879评论 0 358
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,278评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,296评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,019评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,633评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,541评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,068评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,181评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,318评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,991评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,670评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,183评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,302评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,655评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,327评论 2 358

推荐阅读更多精彩内容

  • 转载自:https://halfrost.com/go_map_chapter_one/ https://half...
    HuJay阅读 6,154评论 1 5
  • 1.ConcurrentHashmap简介 在使用HashMap时在多线程情况下扩容会出现CPU接近100%的情况...
    huanfuan阅读 606评论 0 2
  • 昨天因为孩子不收拾卫生的问题在群里求助,得到姐妹们的各种建议,真的很开心,能量得到了加持,在这个群里,我们可以敞开...
    李沁李沁阅读 182评论 0 0
  • 张力佳站在凯丽丝大厦前,无力地挂断电话,又一次被女朋友姚丽丽拒绝。 她说要加班,晚上不能一起吃饭。 他不相信地摇摇...
    沐玉声声阅读 409评论 1 9
  • 尘归尘,土归土 大地是一本书 你在天上想起谁,翻开书,看看 你那双善良的眼睛望着谁,打开书看看 那是最疼的唏嘘 春...
    琴歌素简阅读 251评论 0 0