ConcurrentHashMap 源码解析和设计思路

引导:线程不安全场景下,需要使用 Map 的时候,我们第一个想到的 API 估计就是 ConcurrentHashMap,ConcurrentHashMap 内部封装了锁和各种数据结构来保证访问 Map 是线程安全的。

1:类的注释

我们从类注释上大概可以得到如下信息:
1)所有的操作都是线程安全的,我们在使用时,无需再加锁;
2)多个线程同时进行 put、remove 等操作时并不会阻塞,可以同时进行,和 HashTable 不
同,HashTable 在操作时,会锁住整个 Map;
3)迭代过程中,即使 Map 结构被修改,也不会抛 ConcurrentModificationException 异常;
4)除了数组 + 链表 + 红黑树的基本结构外,新增了转移节点,是为了保证扩容时的线程安全的
节点;
5)提供了很多 Stream 流式方法,比如说:forEach、search、reduce 等等。
从类注释中,我们可以看出 ConcurrentHashMap 和 HashMap 相比,新增了转移节点的数据 结构,至于底层如何实现线程安全,转移节点的具体细节,暂且看不出来,接下来我们细看源
码。

2:与JDK7旧版本的主要区别

-----------1:更小的锁粒度
jdk8中摒弃了segment锁,直接将hash桶的头结点当做锁。
1)JDK7旧版本的一个segment锁(相当于一个hashTable),保护了多个hash桶;
2)JDK8版本的一个锁只保护一个hash桶,由于锁的粒度变小了,写操作的并发性得到了极大的提升。

image

-----------2:更高效的扩容
1 更多的扩容线程
扩容时,需要锁的保护。因此:旧版本最多可以同时扩容的线程数是segment锁的个数
而jdk8的版本,理论上最多可以同时扩容的线程数是:hash桶的个数(table数组的长度)。但是为了防止扩容线程过多,ConcurrentHashMap规定了扩容线程每次最少迁移16个hash桶,因此jdk8的版本实际上最多可以同时扩容的线程数是:hash桶的个数/16

2 扩容期间,依然保证较高的并发度
旧版本的segment锁,锁定范围太大,导致扩容期间,写并发度,严重下降。
而新版本的采用更加细粒度的hash桶级别锁,扩容期间,依然可以保证写操作的并发度。

image

3:ConcurrentHashMap的重要结构与方法

ConcurrentHashMap内部,和hashmap一样,维护了一个table数组,数组元素是Node链表或者红黑树.
Node

    static class Node<K,V> implements Map.Entry<K,V> {
        /**
          Node节点的hash值和key的hash值相同
          TreeNode节点的hash值
        **/
        final int hash;
        final K key;
        volatile V val;  //volatile确保了val的内存可见性
        volatile Node<K,V> next;//volatile确保了next的内存可见性
        ...
        ...
    }

TreeNode/TreeBin
Node的子类,红黑树节点,当Node链表过长时,会转换成红黑树

ForwardingNode
Node的子类,扩容时的标记节点,此处不做分析

table数组
transient volatile Node<K,V>[] table;
关于table数组,有3个重要方法:

    //以volatile读的方式读取table数组中的元素
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
    //以volatile写的方式,将元素插入table数组
    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }
    //以CAS的方式,将元素插入table数组
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        //原子的执行如下逻辑:如果tab[i]==c,则设置tab[i]=v,并返回ture.否则返回false
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
    

jdk8的实现中,锁的粒度是hash桶,因此对table数组元素的读写,大部分都是在没有锁的保护下进行的,那么该如何保证table数组元素的内存可见性?
1)要解决这个问题,我们要了解如下2点:
a. volatile:对一个volatile变量的写一定可见(happens-before)于随后对它的读
b. CAS同时具有volatile读和volatile写的内存语义
然后,我们看下put方法,是如何利用上述规则,在没有锁的情况下,保证table数组元素的内存可见性:


image

如上图所示,在不加锁的情况下:线程A成功执行casTabAt操作后,随后的线程B可以通过tabAt方法立刻看到table[i]的改变。原因如下:线程A的casTabAt操作,具有volatile读写相同的内存语义,根据volatile的happens-before规则:线程A的casTabAt操作,一定对线程B的tabAt操作可见。

2)我们再思考如下问题:ConcurrentHashMap中的锁是hash桶的头结点,那么当多个put线程访问头结点为空的hash桶时,在没有互斥锁保护的情况下,多个put线程都会尝试将元素插入头结点,此时如何确保并发安全呢?


image

假设此时有2个put线程,都发现此时桶为空,线程一执行casTabAt(tab,i,null,node1),此时tab[i]等于预期值null,因此会插入node1。随后线程二执行casTabAt(tba,i,null,node2),此时tab[i]不等于预期值null,插入失败。然后线程二会回到for循环开始处,重新获取tab[i]作为预期值,重复上述逻辑。

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        ...
        for (Node<K,V>[] tab = table;;) {
            ...
            //key定位到的hash桶为空
            if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                //cas设置tab[i]的头结点。
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;   //设置成功,跳出for循环
                //设置失败,说明tab[i]已经被另一个线程修改了。回到for循环开始处,重新判断hash桶是否为空。如何往复,直到设置成功,或者hash桶不空。
            }else{
               synchronized (f) {
                   //
               }
                
            }
        }
        ...
    }

以上通过for循环+CAS操作,实现并发安全的方式就是无锁算法(lock free)的经典实现
java原子类的自增操作,也是通过for循环+CAS操作的方式实现的:

    //JDK7版本的 AtomicInteger 类的原子自增操作
    public final int getAndIncrement() {
        for (;;) {
            //获取value
            int current = get();
            int next = current + 1;
            //value值没有变,说明其他线程没有自增过,将value设置为next
            if (compareAndSet(current, next))
                return current;
            //否则说明value值已经改变,回到循环开始处,重新获取value。
        }
    }
4:ConcurrentHashMap的get方法

get方法同样利用了volatile特性,实现了无锁读
查找value的过程如下:
1)根据key定位hash桶,通过tabAt的volatile读,获取hash桶的头结点。
2)通过头结点Node的volatile属性next,遍历Node链表
3)找到目标node后,读取Node的volatile属性val
可见上述3个操作都是volatile读,因此可以做到在不加锁的情况下,保证value的内存可见性

    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            //定位目标hash桶,通过tabAt方法valatile读,读取hash桶的头结点
            (e = tabAt(tab, (n - 1) & h)) != null) {
            //第一个节点就是要找的元素
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    //e.val也是valatile
                    return e.val;
            }
            //特殊节点(红黑树,已经迁移的节点(ForwardingNode)等
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            //遍历node链表(e.next也是valitle变量)
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }
5:ConcurrentHashMap的put方法

1)由于锁的粒度是hash桶,多个put线程只有在请求同一个hash桶时,才会被阻塞。请求不同hash桶的put线程,可以并发执行。
2)put线程,请求的hash桶为空时,采用for循环+CAS的方式无锁插入。


image
   public V put(K key, V value) {
        return putVal(key, value, false);
    }

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        //for循环+CAS操作
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            //hash桶(tab[i])为空
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                //CAS设置tab[i],不需要加锁
                if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                    break;       // no lock when adding to empty bin
            }
            //hash桶(tab[i])是fwd节点,表示正在扩容
            else if ((fh = f.hash) == MOVED)
                //帮其扩容
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                //hash桶不为空,对tab[i]中的头结点加锁
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        //node链表
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                //key-value entry已经存在,更新value
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                //是尾节点,则插入
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        //红黑树
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    //tab[i]的链表过长,转成红黑树或者扩容(tab.length过短,优先扩容)
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        //size属性+1,如果size属性大于扩容阈值(sizeCtl)则扩容
        addCount(1L, binCount);
        return null;
    }
6:ConcurrentHashMap的扩容实现机制

dk8中,采用多线程扩容。整个扩容过程,通过CAS设置sizeCtl,transferIndex等变量协调多个线程进行并发扩容。
1)nextTable(扩容期间,将table数组中的元素 迁移到 nextTable。)

    /**
     * The next table to use; non-null only while resizing.
       扩容时,将table中的元素迁移至nextTable . 扩容时非空
     */
    private transient volatile Node<K,V>[] nextTable;

2)sizeCtl属性(多线程之间,以volatile的方式读取sizeCtl属性,来判断ConcurrentHashMap当前所处的状态。通过cas设置sizeCtl属性,告知其他线程ConcurrentHashMap的状态变更。)
不同状态,sizeCtl所代表的含义也有所不同。

  • 未初始化:
    • sizeCtl=0:表示没有指定初始容量。
    • sizeCtl>0:表示初始容量。
  • 初始化中:
    • sizeCtl=-1,标记作用,告知其他线程,正在初始化
  • 正常状态:
    • sizeCtl=0.75n ,扩容阈值
  • 扩容中:
    • sizeCtl < 0 : 表示有其他线程正在执行扩容
    • sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 :表示此时只有一个线程在执行扩容
      ConcurrentHashMap的状态图如下:


      image

3)transferIndex属性

    private transient volatile int transferIndex;
     /**
      扩容线程每次最少要迁移16个hash桶
     */
    private static final int MIN_TRANSFER_STRIDE = 16;

扩容索引,表示已经分配给扩容线程的table数组索引位置。主要用来协调多个线程,并发安全地获取迁移任务(hash桶)。

a)在扩容之前,transferIndex 在数组的最右边 。此时有一个线程发现已经到达扩容阈值,准备开始扩容。


image

b)扩容线程,在迁移数据之前,首先要将transferIndex左移(以cas的方式修改 transferIndex=transferIndex-stride(要迁移hash桶的个数)),获取迁移任务。每个扩容线程都会通过for循环+CAS的方式设置transferIndex,因此可以确保多线程扩容的并发安全。

image

换个角度,我们可以将待迁移的table数组,看成一个任务队列,transferIndex看成任务队列的头指针。而扩容线程,就是这个队列的消费者。扩容线程通过CAS设置transferIndex索引的过程,就是消费者从任务队列中获取任务的过程。为了性能考虑,我们当然不会每次只获取一个任务(hash桶),因此ConcurrentHashMap规定,每次至少要获取16个迁移任务(迁移16个hash桶,MIN_TRANSFER_STRIDE = 16)

4)ForwardingNode节点
标记作用,表示其他线程正在扩容,并且此节点已经扩容完毕,关联了nextTable,扩容期间可以通过find方法,访问已经迁移到了nextTable中的数据

     static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            //hash值为MOVED(-1)的节点就是ForwardingNode
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }
        //通过此方法,访问被迁移到nextTable中的数据
        Node<K,V> find(int h, Object k) {
           ...
        }
    }
7:ConcurrentHashMap何时扩容

1)当前容量超过阈值

  private final void addCount(long x, int check) {
        ...
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            //s>=sizeCtl 即容量达到扩容阈值,需要扩容
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
               //调用transfer()扩容
               ...
            }
        }
    }

2)当链表中元素个数超过默认设定(8个),当数组的大小还未超过64的时候,此时进行数组的扩容,如果超过则将链表转化成红黑树

 final V putVal(K key, V value, boolean onlyIfAbsent) {
        ...
        if (binCount != 0) {
                    //链表中元素个数超过默认设定(8个)
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
        }
        ...
 }
      
    private final void treeifyBin(Node<K,V>[] tab, int index) {
        Node<K,V> b; int n, sc;
        if (tab != null) {
            //数组的大小还未超过64
            if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
                //扩容
                tryPresize(n << 1);
            else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                //转换成红黑树
                ...
            }
        }
    }

3)当发现其他线程扩容时,帮其扩容

   final V putVal(K key, V value, boolean onlyIfAbsent) {
      ...
       //f.hash == MOVED 表示为:ForwardingNode,说明其他线程正在扩容
       else if ((fh = f.hash) == MOVED)
           tab = helpTransfer(tab, f);
      ...
   }
8:ConcurrentHashMap扩容过程分析

1)线程执行put操作,发现容量已经达到扩容阈值,需要进行扩容操作,此时transferindex=tab.length=32


image

2)扩容线程A 以cas的方式修改transferindex=31-16=16 ,然后按照降序迁移table[31]--table[16]这个区间的hash桶


image

3)迁移hash桶时,会将桶内的链表或者红黑树,按照一定算法,拆分成2份,将其插入nextTable[i]和nextTable[i+n](n是table数组的长度)。 迁移完毕的hash桶,会被设置成ForwardingNode节点,以此告知访问此桶的其他线程,此节点已经迁移完毕。


image

相关代码如下:

  private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
              ...//省略无关代码
              synchronized (f) {
                      //将node链表,分成2个新的node链表
                      for (Node<K,V> p = f; p != lastRun; p = p.next) {
                          int ph = p.hash; K pk = p.key; V pv = p.val;
                          if ((ph & n) == 0)
                              ln = new Node<K,V>(ph, pk, pv, ln);
                          else
                              hn = new Node<K,V>(ph, pk, pv, hn);
                      }
                      //将新node链表赋给nextTab
                      setTabAt(nextTab, i, ln);
                      setTabAt(nextTab, i + n, hn);
                      setTabAt(tab, i, fwd);
              }
              ...//省略无关代码
  }

4)此时线程2访问到了ForwardingNode节点,如果线程2执行的put或remove等写操作,那么就会先帮其扩容。如果线程2执行的是get等读方法,则会调用ForwardingNode的find方法,去nextTable里面查找相关元素。


image

5)线程2加入扩容操作


image

6)如果准备加入扩容的线程,发现以下情况,放弃扩容,直接返回。
* 发现transferIndex=0,即所有node均已分配
* 发现扩容线程已经达到最大扩容线程数

image

参考:https://www.jianshu.com/p/5bc70d9e5410

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容