Java1.8-ConcurrentHashMap源码解析(一)

一、概述

  从本篇文章开始,我将会对concurrent包下常用的类的使用以及源码学习进行一个记录,本篇文章主要是来了解下ConcurrentHashMap的源码。ConcurrentHashMap其实我们都不陌生了,是一种线程安全的Map集合,接下来我们就来看下的内部实现。

本文中的源码是基于JDK1.8的。

二、源码

1. 继承结构及构造方法

首先,我们来看下ConcurrentHashMap的继承机构:

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable {

首先,ConcurrentHashMap继承自AbstractMap,前面学习HashMap的时候曾简单说过,AbstractMap这个类提供了Map接口的一些简单实现,用来减少该接口实现的一些重复操作,以最小化实现该接口所需的工作量;而接口ConcurrentMap,根据文档描述,这个接口是用来提供线程安全性和原子性保证的映射;最后,实现了Serializable,表明该类支持序列化。

public ConcurrentHashMap()
public ConcurrentHashMap(int initialCapacity)
public ConcurrentHashMap(Map<? extends K, ? extends V> m)
public ConcurrentHashMap(int initialCapacity, float loadFactor) 
public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel)

构造方法的话,都比较常规,不多说了。

2. 常用属性

首先说下,JDK1.8之后,ConcurrentHashMap的内部存储结构其实和HashMap是一样的,都是通过数组 + 链表 + 红黑树来实现的。有一些属性和HashMap是相似的,我们这里就简单略过。

// 默认初始化容量
private static final int DEFAULT_CAPACITY = 16;
// 最大容量,2的30次方
private static final int MAXIMUM_CAPACITY = 1 << 30;

//最大数组容量
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

//负载因子
private static final float LOAD_FACTOR = 0.75f;

//树形化的三个参数,不多说
static final int TREEIFY_THRESHOLD = 8;
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;

然后再看下两个比较特殊的属性:

private transient volatile int sizeCtl;

该属性是用于控制数组大小和扩容的标识符,值不同有不同的含义:

  • 如果是负数,表示数组正在初始化或者扩容;如果是-1,表示正在初始化,-N 表示有N-1个线程正在进行扩容操作;
  • 如果是0或者是正数,表示table还没有被初始化,该值表示初始化或下此扩容的大小,类似于扩容阈值;它的值始终是当前ConcurrentHashMap容量的0.75倍,这与loadfactor是对应的。实际容量>=sizeCtl,则扩容。

另外的一个参数:

private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

该参数是默认的并发级别,JDK1.8中已经不再使用,只是为了与以前的版本兼容才定义的。由于JDK1.8之前ConcurrentHashMap是通过分段锁来实现的,ConcurrentHashMap由多个Segment组成,而每个Segment都有对应的一把锁来实现线程安全。JDK1.8之后,Segment仍然存在,而该字段是服务于Segment的,所以,两者都是为了兼容原先的版本而存在的。

3. 相关节点类
3.1 Node类

保存table数组的元素,一个Node就是table中的一个元素。该类无法进行修改,只能进行只读遍历操作:

static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;

    //构造方法,get方法
    public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
    public final String toString(){ return key + "=" + val; }
    public final V setValue(V value) {
        throw new UnsupportedOperationException();
    }

    public final boolean equals(Object o) {
        Object k, v, u; Map.Entry<?,?> e;
        return ((o instanceof Map.Entry) &&
                (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                (v = e.getValue()) != null &&
                (k == key || k.equals(key)) &&
                (v == (u = val) || v.equals(u)));
    }

    /**
     * Virtualized support for map.get(); overridden in subclasses.
     */
    Node<K,V> find(int h, Object k) {
        Node<K,V> e = this;
        if (k != null) {
            do {
                K ek;
                if (e.hash == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
            } while ((e = e.next) != null);
        }
        return null;
    }
}

Node类的valu及next属性都使用了volatile关键字修饰,表示在并发环境下的修改对其他线程都是可见的,并且Node类提供了一个find方法用于辅助get方法的调用。

3.2 TreeNode和TreeBin
/**
 * Nodes for use in TreeBins
 */
static final class TreeNode<K,V> extends Node<K,V> {
    TreeNode<K,V> parent;  // red-black tree links
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;    // needed to unlink next upon deletion
    boolean red;
}    

static final class TreeBin<K,V> extends Node<K,V> {
    TreeNode<K,V> root;
    volatile TreeNode<K,V> first;
    volatile Thread waiter;
    volatile int lockState;
    // values for lockState
    static final int WRITER = 1; // set while holding write lock
    static final int WAITER = 2; // set when waiting for write lock
    static final int READER = 4; // increment value for setting read lock
}  

TreeNode表示红黑树的具体实现,但正如该类的注释而言:Nodes for use in TreeBins,该类只是为TreeBin类服务,最终有TreeBin来操作红黑树。TreeBin用于封装维护TreeNode,包含了一系列对树的操作方法。

4. 实现

ConcurrentHashMap中使用了大量的CAS算法,总体上实现线程安全的方式是通过CAS + synchronized来实现的,至于CAS,可以参考该文章:CAS原理深度分析,接下来我们来分析下几个主要的方法,首先,还是来看下put方法。

4.1 put方法
final V putVal(K key, V value, boolean onlyIfAbsent) {
    // 1. key和value都不能为空
    if (key == null || value == null) throw new NullPointerException();
    // 2. 对key的hashCode再进行一次计算得到hash值
    int hash = spread(key.hashCode());
    int binCount = 0;
    // 3. 无限循环操作,直到插入成功
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 如果表为空,或表的容量为0,初始化表
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // 表不为空,且表中对应元素已经存在,比较并替换
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        // 如果该节点的hash值为MOVED,说明正在进行扩容操作,则当前线程会优先进行帮助扩容操作
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        // 其他情况下的操作
        else {
            V oldVal = null;
            // 对hash桶进行加锁操作,
            synchronized (f) {
                // 再次检查表中下标为i的节点是否为同一个节点
                if (tabAt(tab, i) == f) {
                    // 链表节点
                    if (fh >= 0) {
                        // 这里的bitCount = 1
                        binCount = 1;
                        // 又一个无限循环,这里bitCount自增,表示链表中的节点数量
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 如果节点的hash值相等,并且key也相等,那么先将旧值取出,然后进行替换操作
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            // 其他情况,如果当前节点的下一个节点为空,也就是最后一个节点
                            // 则将新节点在尾部插入
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    // 如果节点是树类型
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        // 这里的bitCount = 2
                        binCount = 2;
                        // 执行树的插入逻辑
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            // bitCount判断
            if (binCount != 0) {
                // 大于树形化阈值,进行树形化操作
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                // 返回旧的值
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // bitCount加1
    addCount(1L, binCount);
    return null;
}

方法有点长,我们来依次说下:

  1. 首先,ConcurrentHashMap的key和value都不能为空,这点和HashMap有点不同;
  2. 首先,计算key的hash值,然后进入循环;如果表为空,初始化表;如果不为空,通过key的hash在表中查找元素,如果找不到,说明该hash桶为空,通过CAS操作将值放入桶中;如果hash值为-1,也即是MOVED,则说明桶正在进行扩容,帮助进行扩容操作;
  3. 上面情况都不满足,则开始进行分段加锁,遍历操作。首先对桶中的第一个节点进行加锁,然后对链表进行遍历,根据hash值和key值进行比较,存在更新,不存在插入进去;如果该节点是红黑树类型,则进行红黑树的插入逻辑;
  4. 判断链表的长度是否达到树形化阈值,如果达到了,进行树形化;

在进行put操作的时候,会涉及到一些方法,我们来简单看下。先看下初始化方法initTable:

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        // 如果sizeCtl小于0,说明table正在进行初始化或扩容,则当前线程让出CPU资源
        // 让其他线程执行
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        // 比较SIZECTL与sc,相等设置为-1,不相等说明其他线程正在初始化或者已经初始化完成
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                // 再次判断数组是否初始化
                if ((tab = table) == null || tab.length == 0) {
                    // 然后比较sc的值是否大于0,大于0则n为sc,否则,n为默认初始容量
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    // sc = n * 3/4,注意这里无符号右移的计算
                    sc = n - (n >>> 2);
                }
            } finally {
                // 最后设置sizeCtl的值
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

从这里可以看到,table的值是与sizeCtl相关的。

然后来看下另一个方法:tabAt方法,该方法是一个CAS方法,为了方便CAS方法的调用,在ConcurrentHashMap中定义了Unsafe的一个实例:

private static final sun.misc.Unsafe U;

而该方法则是用于获取table中下标为i的节点:

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

另一个CAS方法casTabAt,则是一个比较交换的操作:

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

比较下标为i的节点是否是c,如果是,替换为v节点;

另外,还涉及到树形化方法treeifyBin,及扩容方法tryPresize,树形化方法就不说了,来看下扩容方法:

private final void tryPresize(int size) {
    // 如果给定的容量大于了最大值的1/2,则直接扩为最大值,否则走tableSizeFor方法
    // tableSizeFor方法是找到大于等于count的最小的2的幂次方
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
        tableSizeFor(size + (size >>> 1) + 1);
    int sc;
    // sizeCtl大于等于0,扩容
    while ((sc = sizeCtl) >= 0) {
        Node<K,V>[] tab = table; int n;
        // 同样先判断是否初始化
        if (tab == null || (n = tab.length) == 0) {
            n = (sc > c) ? sc : c;
            // 比较并设置为-1,表示正在进行初始化
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if (table == tab) {
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
            }
        }
        // 如果要扩容的值小于等于原阀值,或现有容量>=最大值,什么都不用做了 
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;
        else if (tab == table) {
            int rs = resizeStamp(n);
            if (sc < 0) {
                Node<K,V>[] nt;
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
        }
    }
}
4.2 get方法

接下来来看下get方法。

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    // 1. 计算key的hash
    int h = spread(key.hashCode());
    // 2. 对table是否为空判断,并判断key所对应的桶tab[i]是否存在
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        // 第一个节点
        if ((eh = e.hash) == h) {
            // hash相等,key相等,返回值
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        // hash值小于0,去树中查找
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        // 否则,去链表中查找
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

get方法就相对简单些了,与HashMap的get方法差不多:

  1. 计算key的hash值;
  2. 对table进行为空判断,不为空,根据hash定位到table中的位置;
  3. 先检查tab[i]的头节点是否满足,不满足再去遍历树或者链表;
4.3 size方法
public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}

size方法用于计算Map中的容量,该方法从JDK1.2就有了,不过不建议使用了,因为推荐了新的方法:mappingCount方法。

public long mappingCount() {
    long n = sumCount();
    return (n < 0L) ? 0L : n; // ignore transient negative values
}

在mappingCount的注释中,我们可以看出:

  • 该方法建议用来代替size()方法使用,因为ConcurrentHashMap可能会包含更多的映射结果,即超过int类型的最大值;
  • 该方法返回的是一个估计值,由于存在并发的插入和删除,因此返回值可能与实际值会有出入;

看到源码会发现两者其实调用的都是sumCount方法,看下sumCount的源码:

final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

最终结果值还是通过属性baseCount计算来的。而baseCount是基于CAS来更新的,所以实际值并不一定是当前Map的真实元素个数。

三、总结

1. 问题

(1)put操作的时候,为什么当hash值为MOVED的时候,桶正在进行扩容操作呢?

  通过查看调用轨迹,我们可以看到是类型为ForwardingNode的节点,构造了一个hash值为MOVED的Node,我们来看下这个类:

/**
 * A node inserted at head of bins during transfer operations.
 */
static final class ForwardingNode<K,V> extends Node<K,V> {
    final Node<K,V>[] nextTable;
    ForwardingNode(Node<K,V>[] tab) {
        super(MOVED, null, null, null);
        this.nextTable = tab;
    }
}

这个类是一个特殊的Node节点,该节点的hash值是-1,也就是MOVED,该类只会在扩容的时候用到,用于在扩容期间,插入到桶的头部的节点。也可以理解为一个占位符,表示当前节点正在被移动。

2. 总结

JDK1.8中的ConcurrentHashMap的数据结果和HashMap的是一样的,然后通过CAS和 synchronized 关键字来保证线程安全。本次内容只简单了解了ConcurrentHashMap的put和get操作,其他的操作其实步骤都差不太多,等有时间了,再去了解下。

本文参考自:
《Java源码分析》:ConcurrentHashMap JDK1.8
docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentHashMap.html
【JUC】JDK1.8源码分析之ConcurrentHashMap(一)
http://www.cnblogs.com/huaizuo/archive/2016/04/20/5413069.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,185评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,652评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,524评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,339评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,387评论 6 391
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,287评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,130评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,985评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,420评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,617评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,779评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,477评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,088评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,716评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,857评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,876评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,700评论 2 354

推荐阅读更多精彩内容