深度剖析 JDK7 ConcurrentHashMap 中的知识点

Lock Stripping

在 Java 中,普通的 HashMap 是非线程安全的,HashTable 是线程安全的 Map。从下面的代码可以看出来,它的线程安全是通过在 get 和 put 方法上加 synchronized 实现的,锁的粒度是整个对象,两个线程都不能同时 get,性能很差。

public class Hashtable<K,V>
    extends Dictionary<K,V>
    implements Map<K,V>, Cloneable, java.io.Serializable {
    
    public synchronized V get(Object key) {
        // 省略具体内容,只看方法定义
    }

    public synchronized V put(K key, V value) {
        // 省略具体内容,只看方法定义
    }
}   

为了改善并发操作的性能,Java 提供了 ConcurrentHashMap,它使用了锁分离技术,将原先 HashTable 中对象级的大锁,换成了多个小粒度的 Segment 锁,每一个 Segment 都相当于一个 HashTable。

    static final class Segment<K,V> extends ReentrantLock implements Serializable {
            transient volatile HashEntry<K,V>[] table;
    }
ConcurrentHashMap 存储结构图

Unsafe

Java 不能直接访问操作系统底层,而是通过本地方法来访问,Unsafe 类提供了一些硬件级别的原子操作。通常,使用 Unsafe 是为了提升性能。

    private static final sun.misc.Unsafe UNSAFE;
    private static final long SBASE;
    private static final int SSHIFT;

    static {
        int ss, ts;
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class sc = Segment[].class;
            SBASE = UNSAFE.arrayBaseOffset(sc);
            ss = UNSAFE.arrayIndexScale(sc);
        } catch (Exception e) {
            throw new Error(e);
        }
        SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);
    }
    
    static final <K,V> Segment<K,V> segmentAt(Segment<K,V>[] ss, int j) {
        long u = (j << SSHIFT) + SBASE;
        return ss == null ? null :
            (Segment<K,V>) UNSAFE.getObjectVolatile(ss, u);
    }

在 ConcurrentHashMap 中,segmentAt 方法用于获取 Segment 数组中第 j 个元素的引用,其中的一些常量用到了 Unsafe 的 arrayBaseOffset、arrayIndexScale、getObjectVolatile 方法,其功能如下表所示:

方法 功能
arrayBaseOffset 获取数组第一个元素的偏移地址
arrayIndexScale 获取数组的转换因子,也就是数组中元素的增量地址
getObjectVolatile 获取 obj 对象中 offset 偏移地址对应的 object 型 field 的值,支持 volatile load 语义

Unsafe 的特性大致可以分为以下几类:

  1. 对变量和数组内容的原子访问,自定义内存屏障
  2. 对序列化的支持
  3. 自定义内存管理/高效的内存布局
  4. 与原生代码和其他JVM进行互操作
  5. 对高级锁的支持

更多 Unsafe 方法的功能请看:Java 中的 Unsafe 类详解

自旋

重量级的锁往往引起线程切换,而线程切换非常耗时。在竞争不激烈,锁占用时间较短(比线程切换耗时短)的环境里,通过 CPU 轮询来获取锁,往往效率更高。

在 Segment 的 remove 方法中,修改数据时需要获取锁,这时候并没有直接调用会阻塞的 lock 方法,而是调用了 tryLock,获取锁失败时进入了 scanAndLock 方法。scanAndLock 方法先是通过自旋来尝试获得锁,当自旋次数超过阈值 MAX_SCAN_RETRIES 时,调用 lock 方法来获得锁。

        final V remove(Object key, int hash, Object value) {
            if (!tryLock())
                scanAndLock(key, hash);
            // 省略其他
        }

        private void scanAndLock(Object key, int hash) {
            // similar to but simpler than scanAndLockForPut
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            int retries = -1;
            while (!tryLock()) {
                HashEntry<K,V> f;
                if (retries < 0) {
                    if (e == null || key.equals(e.key))
                        retries = 0;
                    else
                        e = e.next;
                }
                else if (++retries > MAX_SCAN_RETRIES) {
                    lock();
                    break;
                }
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f;
                    retries = -1;
                }
            }
        }

Sequential Consistency

在 scanAndLock 的文档说明里,提到了 "we must lock even if the key is not found, to ensure sequential consistency of updates",即使 key 不存在,也要加锁,保证更新的顺序一致性。

        /**
         * Scans for a node containing the given key while trying to
         * acquire lock for a remove or replace operation. Upon
         * return, guarantees that lock is held.  Note that we must
         * lock even if the key is not found, to ensure sequential
         * consistency of updates.
         */
        private void scanAndLock(Object key, int hash) {

Sequential Consistency 即顺序一致性,是一种内存一致性模型,它的定义是:

(并发程序在多处理器上的)任何一次执行结果都相同,就像所有处理器的操作按照某个顺序执行,各个微处理器的操作按照其程序指定的顺序进行。换句话说,所有的处理器以相同的顺序看到所有的修改。读操作未必能及时得到此前其他处理器对同一数据的写更新。但是各处理器读到的该数据的不同值的顺序是一致的。

现在的 CPU 和编译器会对代码做各种各样对优化,有时候为了改进性能而把代码执行顺序打乱,这样可能会导致错误对程序执行结果。为了保证顺序一致性,需要使用 Memory Barrier,一般同步原语(例如锁)会隐式地调用 Memory Barrier 指令。

Trade Offs

Trade Offs 即权衡,鱼与熊掌不可兼得。在一些系统性能优化中,经常使用空间换时间,或时间换空间。在 ConcurrentHashMap 中,也有一些权衡。

读与写

请看如下 Segment 类的 get 方法,其中并没有使用锁,显然效率会很高。那么 ConcurrentHashMap 是如何保证在 put 和 rehash 的过程中,读仍然能正常工作呢?

    public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

请看 put 方法的第 32 行,每一次 put 都把新的元素放到 HashEntry 链表的头结点,这样就不会破坏链表的结构,读操作就不会失败。

        final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    else {
                        if (node != null)
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

对于 rehash,请看下面的代码,在复制数据时,并没有去改变原先的 HashEntry 的 next 字段。在新的链表中,是重新 new 了 HashEntry 节点来装载数据。这样一来,旧的链表结构没有被破坏,读操作也就不会失败了。

        /**
         * Doubles size of table and repacks entries, also adding the
         * given node to new table
         */
        @SuppressWarnings("unchecked")
        private void rehash(HashEntry<K,V> node) {
            HashEntry<K,V>[] oldTable = table;
            int oldCapacity = oldTable.length;
            int newCapacity = oldCapacity << 1;
            threshold = (int)(newCapacity * loadFactor);
            HashEntry<K,V>[] newTable =
                (HashEntry<K,V>[]) new HashEntry[newCapacity];
            int sizeMask = newCapacity - 1;
            for (int i = 0; i < oldCapacity ; i++) {
                HashEntry<K,V> e = oldTable[i];
                if (e != null) {
                    HashEntry<K,V> next = e.next;
                    int idx = e.hash & sizeMask;
                    if (next == null)   //  Single node on list
                        newTable[idx] = e;
                    else { // Reuse consecutive sequence at same slot
                        HashEntry<K,V> lastRun = e;
                        int lastIdx = idx;
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) {
                            int k = last.hash & sizeMask;
                            if (k != lastIdx) {
                                lastIdx = k;
                                lastRun = last;
                            }
                        }
                        newTable[lastIdx] = lastRun;
                        // Clone remaining nodes
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                            V v = p.value;
                            int h = p.hash;
                            int k = h & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                        }
                    }
                }
            }
            int nodeIndex = node.hash & sizeMask; // add the new node
            node.setNext(newTable[nodeIndex]);
            newTable[nodeIndex] = node;
            table = newTable;
        }

如果 rehash 时,所有 HashEntry 都 new 一个,会生成大量新对象,耗费过多内存。所以上面代码第 24~32 行,做了一些优化,重用了部分旧链表的节点,即代码注释里所写的 “Reuse consecutive sequence at same slot”。重用的关键是,找到旧链表中的一个关键节点 lastRun,该关键节点确保:从 lastRun 到旧链表的尾节点的所有数据,在新链表中的哈希位置一样。

综上所述,ConcurrentHashMap 通过巧妙的设计 put 和 rehash,多消耗了一些内存,换来了不需要加锁的高效的读操作。

一致性与效率

ConcurrentHashMap 的 clear 方法如下所示,其并没有一个全局的锁,这会导致清理完一个 Segment 后,正在清理下一个 Segment 时,前面的 Segment 有可能已经被其他线程放入了数据,这样就会导致 clear 方法结束的时候,ConcurrentHashMap 中可能存在数据。

    public void clear() {
        final Segment<K,V>[] segments = this.segments;
        for (int j = 0; j < segments.length; ++j) {
            Segment<K,V> s = segmentAt(segments, j);
            if (s != null)
                s.clear();
        }
    }

也就是说,相对于 HashTable 的强一致,ConcurrentHashMap 是弱一致的。那么弱一致性带来了什么好处呢?因为没有全局锁,从而可以提供更好的读写并发能力。

参考资料

  1. Java 中的 Unsafe 类详解
  2. sun.misc.Unsafe 的后启示录
  3. 顺序一致性与缓存一致性
  4. 维基百科:内存一致性模型
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,826评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,968评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,234评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,562评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,611评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,482评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,271评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,166评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,608评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,814评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,926评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,644评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,249评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,866评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,991评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,063评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,871评论 2 354

推荐阅读更多精彩内容