并发容器之阻塞容器(一)jdk7 ConcurrentHashmap源码分析

首先上UML 类图:

image.png

整个 ConcurrentHashMap 由Segment数组 组成,Segment 代表”部分“或”一段“的意思,所以很多地方都会将其描述为分段锁。在我的理解看来,这个类似于分库分表中的水平切分,把一个大的hashtable 根据hash值来切分成一个个小的hashtable ,然后每个hashtable采用一个表锁。
而且小hashtable至少是2个bucket MIN_SEGMENT_TABLE_CAPACITY=2
结构如下图:
image.png

初始化过程:

 public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        // 找到2的倍数大于等于concurrencyLevel,假如concurrencyLevel=16
        int sshift = 0;
        int ssize = 1;   //segement的大小
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        //sshift =4; ssize =16;
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        //假如把一个bucket看做hashtable中的一行,那么initialCapacity是分表中所有行的总行数。
        //c是每个分表能获取几行。
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        // 初始化,第一个分表, cap =2
        Segment<K,V> s0 =
                new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                        (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        //注意,这里不是TBASE ,应该是数组的基地址
        /**
         * 设置obj对象中offset偏移地址对应的object型field的值为指定值。这是一个有序或者
         *  有延迟的<code>putObjectVolatile</cdoe>方法,并且不保证值的改变被其他线程立
         *   即看到。只有在field被<code>volatile</code>修饰并且期望被意外修改的时候
         *    使用才有用。
         *    public native void putOrderedObject(Object obj, long offset, Object value);
         * @https://www.cnblogs.com/mickole/articles/3757278.html Unsafe 的API
         */
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

上面的代码中涉及到几个final变量。在用Unsafe 直接操作内存的方法中,大量被用到。

private static final sun.misc.Unsafe UNSAFE;
    private static final long SBASE;
    private static final int SSHIFT;
    private static final long TBASE;
    private static final int TSHIFT;
    private static final long HASHSEED_OFFSET;

    static {
        int ss, ts;
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class tc = HashEntry[].class;
            Class sc = Segment[].class;
            //hashtable的基地址
            TBASE = UNSAFE.arrayBaseOffset(tc);
            //segement数组的基地址,也就是segement[0]的地址
            SBASE = UNSAFE.arrayBaseOffset(sc);
            //table scale hashtable的单位偏移量
            ts = UNSAFE.arrayIndexScale(tc);
            // segement scale segement的单位偏移量
            ss = UNSAFE.arrayIndexScale(sc);
            HASHSEED_OFFSET = UNSAFE.objectFieldOffset(
                    ConcurrentHashMap.class.getDeclaredField("hashSeed"));
        } catch (Exception e) {
            throw new Error(e);
        }
        if ((ss & (ss-1)) != 0 || (ts & (ts-1)) != 0)
            throw new Error("data type scale not a power of two");
        /**
         * Integer.numberOfLeadingZeros(ss) 找到变量ss前面0的个数,比如说
         * ss =4 表示换成二进制就是)0x0000-0000-0000-0000-0000-0000-0000-0100
         * Integer.numberOfLeadingZeros(ss) = 29
         * 32 - 29-1(0100中的1 需要减去) =2
         * 
         * 在get方法中,hash的高4位,取模,确定index索引,然后本来应该是索引乘以segementScale 也就是下面的ss变量=4 是4个字节。
         * 但是这里直接采用左移来代替乘法,会更快。左移两位代表乘以4.最后加上segement的基地址
         * long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
         */
        SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);
        TSHIFT = 31 - Integer.numberOfLeadingZeros(ts);
    }
  • //hashtable的基地址 TBASE = UNSAFE.arrayBaseOffset(tc);
  • //segement数组的基地址,也就是segement[0]的地址 SBASE = UNSAFE.arrayBaseOffset(sc);
  • //table scale hashtable的单位偏移量 ts = UNSAFE.arrayIndexScale(tc);
  • // segement scale segement的单位偏移量 ss = UNSAFE.arrayIndexScale(sc);
  • SSHIFT index索引<< SSHIFT +SBASE 获得某一个segement的地址
  • TSHIFT 同理,((long)i << TSHIFT) + TBASE,分表hashtable内部的HashEntry的定位。
    另外,Unsafe类是直接操作内存的一个工具,但是用的不好的话,是会造成数据不一致性。少用为妙。附上介绍其API的一片文章:https://www.cnblogs.com/mickole/articles/3757278.html

put 过程分析(分段锁如何实现)

public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // segments不是volatile的,所以check一下
                (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);   //ensureSegment这个方法是初始化segement对象
        return s.put(key, hash, value, false);
    }
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1; // negative while locating node

    // 循环获取锁
    while (!tryLock()) {
        HashEntry<K,V> f; // to recheck first below
        if (retries < 0) {
            if (e == null) {
                if (node == null) // speculatively create node
                    // 进到这里说明数组该位置的链表是空的,没有任何元素
                    // 当然,进到这里的另一个原因是 tryLock() 失败,所以该槽存在并发,不一定是该位置
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0;
            }
            else if (key.equals(e.key))
                retries = 0;
            else
                // 顺着链表往下走
                e = e.next;
        }
        // 重试次数如果超过 MAX_SCAN_RETRIES(单核1多核64),那么不抢了,进入到阻塞队列等待锁
        //    lock() 是阻塞方法,直到获取锁后返回
        else if (++retries > MAX_SCAN_RETRIES) {
            lock();
            break;
        }
        else if ((retries & 1) == 0 &&
                 // 这个时候是有大问题了,那就是有新的元素进到了链表,成为了新的表头
                 //     所以这边的策略是,相当于重新走一遍这个 scanAndLockForPut 方法
                 (f = entryForHash(this, hash)) != first) {
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}

这个方法有两个出口,一个是 tryLock() 成功了,循环终止,另一个就是重试次数超过了 MAX_SCAN_RETRIES,进到 lock() 方法,此方法会阻塞等待,直到成功拿到独占锁。

这个方法就是看似复杂,但是其实就是做了一件事,那就是获取该 segment 的独占锁,如果需要的话顺便实例化了一下 node。
···
// 方法参数上的 node 是这次扩容后,需要添加到新的数组中的数据。
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
// 2 倍
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
// 创建新数组
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
// 新的掩码,如从 16 扩容到 32,那么 sizeMask 为 31,对应二进制 ‘000...00011111’
int sizeMask = newCapacity - 1;

// 遍历原数组,老套路,将原数组位置 i 处的链表拆分到 新数组位置 i 和 i+oldCap 两个位置
for (int i = 0; i < oldCapacity ; i++) {
    // e 是链表的第一个元素
    HashEntry<K,V> e = oldTable[i];
    if (e != null) {
        HashEntry<K,V> next = e.next;
        // 计算应该放置在新数组中的位置,
        // 假设原数组长度为 16,e 在 oldTable[3] 处,那么 idx 只可能是 3 或者是 3 + 16 = 19
        int idx = e.hash & sizeMask;
        if (next == null)   // 该位置处只有一个元素,那比较好办
            newTable[idx] = e;
        else { // Reuse consecutive sequence at same slot
            // e 是链表表头
            HashEntry<K,V> lastRun = e;
            // idx 是当前链表的头结点 e 的新位置
            int lastIdx = idx;

            // 下面这个 for 循环会找到一个 lastRun 节点,这个节点之后的所有元素是将要放到一起的
            for (HashEntry<K,V> last = next;
                 last != null;
                 last = last.next) {
                int k = last.hash & sizeMask;
                if (k != lastIdx) {
                    lastIdx = k;
                    lastRun = last;
                }
            }
            // 将 lastRun 及其之后的所有节点组成的这个链表放到 lastIdx 这个位置
            newTable[lastIdx] = lastRun;
            // 下面的操作是处理 lastRun 之前的节点,
            //    这些节点可能分配在另一个链表中,也可能分配到上面的那个链表中
            for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                V v = p.value;
                int h = p.hash;
                int k = h & sizeMask;
                HashEntry<K,V> n = newTable[k];
                newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
            }
        }
    }
}
// 将新来的 node 放到新数组中刚刚的 两个链表之一 的 头部
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;

}
···
这里的扩容比之前的 HashMap 要复杂一些,代码难懂一点。上面有两个挨着的 for 循环,第一个 for 有什么用呢?

仔细一看发现,如果没有第一个 for 循环,也是可以工作的,但是,这个 for 循环下来,如果 lastRun 的后面还有比较多的节点,那么这次就是值得的。因为我们只需要克隆 lastRun 前面的节点,后面的一串节点跟着 lastRun 走就是了,不需要做任何操作。

我觉得 Doug Lea 的这个想法也是挺有意思的,不过比较坏的情况就是每次 lastRun 都是链表的最后一个元素或者很靠后的元素,那么这次遍历就有点浪费了。不过 Doug Lea 也说了,根据统计,如果使用默认的阈值,大约只有 1/6 的节点需要克隆。

get 过程分析

public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    // 1. hash 值
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    // 2. 根据 hash 找到对应的 segment
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        // 3. 找到segment 内部数组相应位置的链表,遍历
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}

总结:

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,002评论 6 509
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,777评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,341评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,085评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,110评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,868评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,528评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,422评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,938评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,067评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,199评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,877评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,540评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,079评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,192评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,514评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,190评论 2 357

推荐阅读更多精彩内容