浅谈ConcurrentHashMap

简介

ConcurrentHashMap是java.util.concurrent包下的重要成员,也是Java并发容器中使用较为频繁的。

ConcurrentHashMap不是采用和HashTable一样的方法,在访问操作的时候使用synchronized保证只有一个线程能操作,而通过一种更细粒度的锁来实现锁住部分哈希表,从而提高并发性能。

但是在使用上ConcurrentHashMap是可以与HashTable互操作的,它遵循HashTable的规范,也就是说在Java5之后,随着ConcurrentHashMap的引入,想要提高性能的话可以把使用HashTable的地方替换成ConcurrentHashMap。可以说,ConcurrentHashMap的到来标志之HashTable的过时。

如果性能不是考虑的因素,只需要保证Map的线程安全的话,还可以通过Java的集合工具类获取一个经过包装的SynchronizedMap,原理就是通过包装器模式(也叫装饰模式)把所有对原Map的操作进行一次synchronized包装:

Map m = Collections.synchronizedMap(new HashMap())

理解ConcurrentHashMap的前提

由于涉及到并发编程,也就是说我们要理解ConcurrentHashMap的时候需要知道一些Java中提供的并发编程类库和知识体系,下面我主要大概介绍下ConcurrentHashMap需要提前知道的知识点,不做具体深入。

volatile关键字

Java中使用volatile关键字来保证变量的修改能够被其他线程知道,读和写一个volatile变量都有全局的排序(也就是不会被重排序,Java内存允许编译器和处理器对指令重排序),volatile变量不会被缓存在寄存器或者对其他处理器不可见的地方。

ReentrantLock

ReentrantLock是Java中用于并发编程的一个可重入的互斥锁,通常称为再入锁。Java5之后提供的锁实现,通过lock()实现加锁,通过unlock()实现锁释放。

相对于synchronized同步锁,支持公平锁和非公平锁来实现一定程度的公平性,代码书写也相对灵活。

Unsafe

JDK中提供了类sun.misc.Unsafe来支持任意内存地址位置的数据读写和一些CAS(compareAndSwap)原子操作,Unsafe不对用户开放,只对Java核心类库开放,通过判断调用者是否为Bootstrap类加载器加载的类来判断,可用通过反射获取该实例,但是建议在不得已的情况下不要使用Unsafe。

CAS

CAS(compare and swap,比较并交换)是实现原子操作的一种,可用于在多线程编程中实现不被打断的数据交换操作,从而避免多线程同时改写某一数据时由于执行顺序不确定性以及中断的不可预知性产生的数据不一致问题。 该操作通过将内存中的值与指定数据进行比较,当数值一样时将内存中的数据替换为新的值。

ConcurrentHashMap分析

ConcurrentHashMap随着Java的升级过程中也一直在升级演变,所以这里主要分析大概架构以及对比Java7和Java8中ConcurrentHashMap的变化。

ConcurrentHashMap结构

ConcurrentHashMap的大致结构如下,通过Segment对Map进行分段保存,当操作的时候只需要对对应的Segment进行锁定即可(Segment自身继承了ReentrantLock),不用像HashTable一样对整个链表锁定,进而大大提高性能。

image.png

ConcurrentHashMap In Java7

几个重要常量

// 如果没有指定Map大小,默认为16
static final int DEFAULT_INITIAL_CAPACITY = 16;

// 如果没有指定默认的加载因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;

// 默认的并发等级,也就是Segment数组的大小
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

// 该Map的最大容量,即2的30次方
static final int MAXIMUM_CAPACITY = 1 << 30;

// Segment中HashEntry数组的最小长度,必须是2的倍数,最小是2为了避免懒加载构造之后下次调用会立即扩容
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

// Segment数组的默认最长长度(2的16次方),必须是2的倍数,Segment最大是`1<<24`(2的24次方)
static final int MAX_SEGMENTS = 1 << 16; //比较保守

// 在非同步操作的情况下的重试次数(size,containsValue),避免在哈希表在连续修改的情况下出现无限重试
static final int RETRIES_BEFORE_LOCK = 2;

创建&初始化

  • ConcurrentHashMap构造函数
// 三个参数的构造函数(初始化大小,加载因子,并发等级)
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel)

// 两个参数的构造函数(初始化大小,加载因子)
public ConcurrentHashMap(int initialCapacity, float loadFactor)

// 一个参数的构造函数(初始化大小)
public ConcurrentHashMap(int initialCapacity)

// 无参构造函数
public ConcurrentHashMap()

// 通过其他Map创建
public ConcurrentHashMap(Map<? extends K, ? extends V> m)

其中,需要关心的三个参数的构造函数,其他构造函数都是自定义+默认值(上述常量)最终调用三个参数的构造函数创建

  • 初始化
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
    // 检查
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    // 并发等级不能超过MAX_SEGMENTS
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // 根据concurrencyLevel计算Segment的长度,必须是2的倍数
    int sshift = 0;
    int ssize = 1;
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // 保存segmentShift和segmentMask用于索引Segment
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;
    // 初始大小不能超过MAXIMUM_CAPACITY
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    // 计算每个Segment大小,初始化大小除以segment数组长度
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    // 计算每个Segment初始化HashEntry大小,最小为MIN_SEGMENT_TABLE_CAPACITY
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    while (cap < c)
        cap <<= 1;
    // 创建Segment数组,和第一个Segment
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    // 使用UNSAFE写入Segment数组
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

其中,为何在初始化期间创建第一个Segment是因为后面的Segment的创建都是以segments[0]作为模板创建的

get操作

get操作只是获取操作,这里没有任何同步代码或者锁,支持对数据的可见性

public V get(Object key) {
    // 手动集成访问方法以减少开销
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    // 获取hash值
    int h = hash(key);
    // 通过位运算计算散列所在Segment
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    // 通过UNSAFE获取对应的Segment,getObjectVolatile方法能保证其可见性
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        // 遍历获取到对应的值
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    // 找不到返回空
    return null;
}

put操作

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    //和get操作一样定位到Segment
    int hash = hash(key);
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j);
    //通过Segment的put操作保存
    return s.put(key, hash, value, false);
}

所以,重点的逻辑在Segment的put方法中:

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 无论如何获取锁,首先尝试获取锁,如果获取不到通过scanAndLockForPut获取锁
    // scanAndLockForPut会通过while循环获取锁,直到获取到锁
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        //通过hash值定位到对应链表即HashEntry
        HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        HashEntry<K,V> first = entryAt(tab, index);//定位到具体HashEntry
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                //如果已经存在相同key,更新value
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            }
            else {
                //如果不存在key则新增一个元素
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                //如果已经超过阈值则执行rehash进行扩容,然后保存
                //注意,扩容只是在Segment中扩容 
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        unlock();
    }
    return oldValue;
}

size操作

ConcurrentHashMap中的size是一个比较有意思的操作,因为获取Map大小是一个比较常用的方法,如果不加锁获取可能会不准确因为同时可能有其他线程在操作map,如果加锁获取将会比较耗时,这里采用了通过指定重试次数(RETRIES_BEFORE_LOCK)(2次)的方法来获取大小;具体就是在在重试次数内比较两次获取的size,如果一直则认为是准确值,如果不一致进行锁定然后再获取size

public int size() {
    // Try a few times to get accurate count. On failure due to
    // continuous async changes in table, resort to locking.
    final Segment<K,V>[] segments = this.segments;
    int size;
    boolean overflow; // true if size overflows 32 bits
    long sum;         // sum of modCounts
    long last = 0L;   // previous sum
    int retries = -1; // first iteration isn't retry
    try {
        for (;;) {
            //如果达到重试次数,锁定
            if (retries++ == RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    ensureSegment(j).lock(); // force creation
            }
            sum = 0L;
            size = 0;
            overflow = false;
            //计算size
            for (int j = 0; j < segments.length; ++j) {
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) {
                    sum += seg.modCount;
                    int c = seg.count;
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }
            //比较两次获取的结果是否一致
            if (sum == last)
                break;
            last = sum;
        }
    } finally {
        //如果锁定,则释放锁
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    return overflow ? Integer.MAX_VALUE : size;
}

ConcurrentHashMap在Java8新变化

ConcurrentHashMap在Java8中的已经发生比较大的变化,基本的结构和Java8的HashMap差不多,同样的当链表长度超过8的时候使用红黑树;使用volatile保证可见性,Segment已经被取消,保留Segment只是为了反序列化的兼容,初始化改为懒加载,并发控制精确每个链表,使用synchronize同步锁代替再入锁,使用CAS操作等。

数据结构

image.png

put操作

get操作就是类似HashMap的检索比较,这里重点关注put操作,看看在Java8中是如何实现并发操作的,其他操作暂不展开

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            //如果是第一次put,进行初始化(懒加载)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            //如果链表为空,使用CAS进行无锁put
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)
            //如果处于扩容阶段,
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            //这里在链表/树的第一个元素加锁
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        //如果是链表
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {
                        //如果是红黑树
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    //如果是长度达到TREEIFY_THRESHOLD(默认是8),执行树化操作
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

懒加载初始化

这里可以看到使用CAS操作,实现table的初始化,并且使用双重检查(double-check)保证重复初始化

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    //进入循环检查
    while ((tab = table) == null || tab.length == 0) {
        if ((sc = sizeCtl) < 0)
            //如果正在初始化,等待
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            //进入初始化操作
            try {
                //再次检查
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

总结

可以看到,随之Java平台的升级,ConcurrentHashMap也是逐渐演变,但是都是为了更好的达到ConcurrentHashMap的设计目标:

  • 在最小化实现更新操作的同时保持并发的可读性
  • 是空间消耗到达和HashMap差不多或者更好

ConcurrentHashMap还有续续多多的实现细节,需要仔细阅读源码才能理解,并且涉及到许多底层操作的实现,能力有限,暂时能理解的就这么多,后续希望继续深入理解。

参考

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,907评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,987评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,298评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,586评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,633评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,488评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,275评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,176评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,619评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,819评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,932评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,655评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,265评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,871评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,994评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,095评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,884评论 2 354

推荐阅读更多精彩内容

  • 本文是我自己在秋招复习时的读书笔记,整理的知识点,也是为了防止忘记,尊重劳动成果,转载注明出处哦!如果你也喜欢,那...
    波波波先森阅读 11,263评论 4 56
  • 正确使用Context一般Context造成的内存泄漏,几乎都是当Context销毁的时候,却因为被引用导致销毁失...
    慶孟國阅读 389评论 0 0
  • 这是第2次心理咨询的上半部分。 在去之前,我有点好奇今天能得到什么新的启发;同时又觉得,好像也没什么问题特别想要探...
    YiWei_阅读 467评论 0 1
  • 不知不觉5天过去了,时间总是过得很快,很开心参与了这个活动,认识了一些人,并坚持到了现在。 想起接触活动后打卡的第...
    三七糖阅读 251评论 0 0
  • 视频课程,大家可以点击 了解完在线营销的基本知识和益处,并评估了您的网站是否已做好充分准备后,下面就来了解一下 G...
    amygoogle阅读 278评论 0 0