ConcurrentHashMap JDK1.8原理分析

1.7与1.8区别

1.7采用分段锁的概念,如下图所示,每段包含多个节点,并且都是加悲观锁


1.7

1.8同样是采用分段锁的思想,只是这次将分段锁的粒度降低到节点级别,并且采用了部分CAS乐观锁的操作,大大提升了并发性能
数据结构沿用了与它同时期的HashMap版本的思想,底层依然由数组+链表+红黑树的方式思想。
有一个最重要的不同点就是ConcurrentHashMap不允许key或value为null值


1.8

重点变量

    /**
     * 盛装Node元素的数组 它的大小是2的整数次幂
     * Size is always a power of two. Accessed directly by iterators.
     */
    transient volatile Node<K,V>[] table;
        
        /**
     * Table initialization and resizing control.  When negative, the
     * table is being initialized or resized: -1 for initialization,
     * else -(1 + the number of active resizing threads).  Otherwise,
     * when table is null, holds the initial table size to use upon
     * creation, or 0 for default. After initialization, holds the
     * next element count value upon which to resize the table.
     hash表初始化或扩容时的一个控制位标识量。
     负数代表正在进行初始化或扩容操作
     -1代表正在初始化
     -N 表示有N-1个线程正在进行扩容操作
     正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小
     
     */
    private transient volatile int sizeCtl; 

Node

Node是ConcurrentHashMap最核心的内部类,每个Node可以理解为数组中的一个节点

static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;//带有同步锁的value
        volatile Node<K,V> next;//带有同步锁的next指针

get

因为Node的val值域是volatile的,所以无需加锁就可以得到节点的最新值

public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        //计算hash值
        int h = spread(key.hashCode());
        //根据hash值确定节点位置
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            //如果搜索到的节点key与传入的key相同且不为null,直接返回这个节点  
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            //如果eh<0 说明这个节点在树上 直接寻找
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
             //否则遍历链表 找到对应的值并返回
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

put

简洁说明:

  • 根据给定的key的hash值找到其在table中的位置index
  • 找到位置index后,根据以下情况进行存储或者帮助扩容后存储
  1. 如果当前正在扩容,则优先帮助扩容
  2. 如果table[index]位置没有元素,则直接通过CAS存储
  3. 如果table[i]存储的是一个链表:如果链表不存在key则直接加入到链表尾部;如果存在key则更新其对应的value;如果存入后链表元素>8,还需要将链表转换为红黑树
  4. 如果table[i]存储的是一个红黑树,则按照红黑树方式插入

其中3跟4需要synchronized对头节点进行加锁

public V put(K key, V value) {  
        return putVal(key, value, false);  
    }  
  
    /** Implementation for put and putIfAbsent */  
    final V putVal(K key, V value, boolean onlyIfAbsent) {  
            //不允许 key或value为null  
        if (key == null || value == null) throw new NullPointerException();  
        //计算hash值  
        int hash = spread(key.hashCode());  
        int binCount = 0;  
        //死循环 何时插入成功 何时跳出  
        for (Node<K,V>[] tab = table;;) {  
            Node<K,V> f; int n, i, fh;  
            //如果table为空的话,初始化table  
            if (tab == null || (n = tab.length) == 0)  
                tab = initTable();  
            //根据hash值计算出在table里面的位置   
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {  
                //如果这个位置没有值 ,直接放进去,不需要加锁  
                if (casTabAt(tab, i, null,  
                             new Node<K,V>(hash, key, value, null)))  
                    break;                   // no lock when adding to empty bin  
            }  
            //当遇到表连接点时,需要进行整合表的操作  
            else if ((fh = f.hash) == MOVED)  
                tab = helpTransfer(tab, f);  
            else {  
                V oldVal = null;  
                //结点上锁  这里的结点可以理解为hash值相同组成的链表的头结点  
                synchronized (f) {  
                    if (tabAt(tab, i) == f) {  
                        //fh〉0 说明这个节点是一个链表的节点 不是树的节点  
                        if (fh >= 0) {  
                            binCount = 1;  
                            //在这里遍历链表所有的结点  
                            for (Node<K,V> e = f;; ++binCount) {  
                                K ek;  
                                //如果hash值和key值相同  则修改对应结点的value值  
                                if (e.hash == hash &&  
                                    ((ek = e.key) == key ||  
                                     (ek != null && key.equals(ek)))) {  
                                    oldVal = e.val;  
                                    if (!onlyIfAbsent)  
                                        e.val = value;  
                                    break;  
                                }  
                                Node<K,V> pred = e;  
                                //如果遍历到了最后一个结点,那么就证明新的节点需要插入 就把它插入在链表尾部  
                                if ((e = e.next) == null) {  
                                    pred.next = new Node<K,V>(hash, key,  
                                                              value, null);  
                                    break;  
                                }  
                            }  
                        }  
                        //如果这个节点是树节点,就按照树的方式插入值  
                        else if (f instanceof TreeBin) {  
                            Node<K,V> p;  
                            binCount = 2;  
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,  
                                                           value)) != null) {  
                                oldVal = p.val;  
                                if (!onlyIfAbsent)  
                                    p.val = value;  
                            }  
                        }  
                    }  
                }  
                if (binCount != 0) {  
                    //如果链表长度已经达到临界值8 就需要把链表转换为树结构  
                    if (binCount >= TREEIFY_THRESHOLD)  
                        treeifyBin(tab, i);  
                    if (oldVal != null)  
                        return oldVal;  
                    break;  
                }  
            }  
        }  
        //将当前ConcurrentHashMap的元素数量+1  
        addCount(1L, binCount);  
        return null;  
    }  
      

size

ConcurrentHashMap 中键值对的个数通过求 baseCount 与 counterCells 非空元素的和得到

    /**
     * Base counter value, used mainly when there is no contention,
     * but also as a fallback during table initialization
     * races. Updated via CAS.
     * 当没有争用时,使用这个变量计数。
     */
    private transient volatile long baseCount;

    /**
     * Table of counter cells. When non-null, size is a power of 2.
     */
    private transient volatile CounterCell[] counterCells;

static final class CounterCell {
    volatile long value;
    CounterCell(long x) { value = x; }
}

一个 volatile 的变量,在 addCount 方法中会使用它,而 addCount 方法在 put 结束后会调用。在 addCount 方法中,会对这个变量做 CAS 加法
但是如果并发导致 CAS 失败了,使用 counterCells
如果使用 counterCells CAS 失败了,在 fullAddCount 方法中,会继续死循环操作,直到成功

这种方式目的是降低更新size时的冲突,提升性能

扩容

整个扩容操作分为两个部分
第一部分是构建一个nextTable,它的容量是原来的两倍,这个操作是单线程完成的。这个单线程的保证是通过RESIZE_STAMP_SHIFT这个常量经过一次运算来保证的
第二个部分就是将原来table中的元素复制到nextTable中,这里允许多线程进行操作。

先来看一下单线程是如何完成的:
它的大体思想就是遍历、复制的过程。首先根据运算得到需要遍历的次数i,然后利用tabAt方法获得i位置的元素:
如果这个位置为空,就在原table中的i位置放入forwardNode节点,这个也是触发并发扩容的关键点;
如果这个位置是Node节点(fh>=0),如果它是一个链表的头节点,就构造一个反序链表,把他们分别放在nextTable的i和i+n的位置上,然后在原table中的i位置放入forwardNode节点
如果这个位置是TreeBin节点(fh<0),也做一个反序处理,并且判断是否需要untreeify,把处理的结果分别放在nextTable的i和i+n的位置上,然后在原table中的i位置放入forwardNode节点
遍历过所有的节点以后就完成了复制工作,这时让nextTable作为新的table,并且更新sizeCtl为新容量的0.75倍 ,完成扩容。

再看一下多线程是如何完成的:
多线程遍历节点,处理了一个节点,就把对应点的值set为forward,另一个线程看到ForwardingNode节点,就向后遍历。

image.png

Key和Value不允许null值

ConcurrentHashmap和Hashtable都是支持并发的,这样会有一个问题,当你通过get(k)获取对应的value时,如果获取到的是null时,你无法判断,它是put(k,v)的时候value为null,还是这个key从来没有做过映射。
HashMap是非并发的,可以通过contains(key)来做这个判断。
而ConcurrentHashMap在调用m.containsKey(key)和m.get(key),这两个方法都是没有加锁的,调用时候m可能被其他线程改变了。
假如一个线程m.containsKey(k)为真,在还没执行m.get(k)的时候,k被另外一个线程给删除了,那么m.get(k)会返回null。如果允许null值的话,就会错误的判断为k还存在;因此不允许null值的话就可以正常的表示出当前的k是不存在的。所以在ConcurrentHashMap不应该有如下的写法,Key和Value不允许null值。
其实Value不允许null值就可以,Key为null似乎没什么影响,作者一起排除null我也不知道什么原因。

if (m.containsKey(k)) {
   return m.get(k);
} else {
   throw new KeyNotPresentException();
}

总结

  • Node级别分段锁
  • 读不加锁
  • 写不一定需要加锁
  • 可多线程扩容
  • size计算方式特殊
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容