ConcurrentHashMap源码分析

ConcurrentHashMap是线程安全的hashMap的一个高效实现,相比与hashTable每次put操作都会对整个哈希表加synchronize锁,ConcurrentHashMap尽量使用CAS操作来保证线程安全,遇到一定要加锁的情况也只会对哈希表中的一个槽加锁,不影响其他槽的读写,保证线程安全的同时大大提高了效率。

jdk7中的实现

简而言之,ConcurrentHashMap在对象中保存了一个Segment数组,即将整个Hash表划分为多个分段;而每个Segment元素,即每个分段则类似于一个Hashtable;这样,在执行put操作时首先根据hash算法定位到元素属于哪个Segment,然后对该Segment加锁即可。


ConcurrentHashMap的缺点

1.哈希表的大小size需要通过计算得到(使用了一个类似LongAdder的结构来保存键值对个数),无法直接返回,且不一定准确。

2.遍历时弱一致性:当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍历,这时内容是旧的。相比之下,对于HashMap,如果迭代器遍历时发生修改,会因为failfast机制抛出ConcurrentModificationException 异常。


成员变量和内部类

虽然ConcurrentHashMap新增了一些成员变量和内部类,但整个容器存储键值对的数据结构与HashMap相同,都是Node数组+链表+红黑树的数据结构,新增的结构都是为了保证并发安全。

Node类

ConcurrentHashMap中的Node类与HashMap中的Node类相同,用于存储键值对,区别在于将value和next设置成了volatile。在ConcurrentHashMap中,所有普通结点的哈希值都为正数。

特殊节点

    TreeNode:当哈希表中一个槽中的链表树化后,槽中的头结点升级为TreeNode,TreeNode的哈希值为-2。

    ForwardingNode:标记节点,用于ConcurrentHashMap扩容时放入到原table数组的槽中,表示该槽的链表已经搬迁完毕。该结点的哈希值为-1。

ReservationNode:标记结点,当使用ConcurrentHashMap的compute相关函数时,会将该结点放入槽中作为占位符,该结点的哈希值为-3.


核心成员变量


table是一个volatile修饰的Node数组,与HashMap一样,用于存储键值对。

nextTable是在扩容时新建的数组,它只在扩容期间存在,扩容结束后会将nextTable赋值给table,然后将其置为空。

sizeCtl有多种用途,初始化是默认值为0,如果调用者指定了ConcurrentHashMap的初始容量,sizeCtl就用于存储这个初始容量值。正常使用过程中,sizeCtl用于记录哈希表下次扩容的threshold。当table初始化时,sizeCtl被置为-1表示table数组正在被初始化。扩容时,sizeCtl值为-(参与扩容的线程数+1),记录扩容当前哈希表的线程数。

baseCount,cellsBusy,counterCells都是用来记录当前ConcurrentHashMap的键值对个数,每个变量的作用与LongAdder相同。


核心方法

构造函数

ConcurrentHashMap与HashMap一样用的懒加载,构造函数只是将初始容量赋值给sizeCtl,真正初始化table数组是在第一次put函数时。从源码中可以看出,initialCapacity会经过计算再赋值给sizeCtl,ConcurrentHashMap的实际初始容量不等于initialCapacity。


initTable函数

该函数才真正初始化了table数组,逻辑也非常简单,就是新建一个Node数组赋值给成员变量table,再将sizeCtl设置为下次扩容的阈值,即容量的四分之三。该方法主要使用了一个CAS锁,即线程必须成功CAS将sizeCtl置为-1,才能进入初始化的代码逻辑。这样保证了只有有一个线程初始化table数组,而其他线程要么因为yield让出CPU,要么while中无限空转,直到table数组被创建好后直接返回。

spread函数:

用于计算键的哈希码,在HashMap的计算算法之上与HASH_BITS做与运算,即将哈希码的最高位置0,这样保证了所有普通结点的哈希码都是正数,与特殊的结点区分开。

get函数

与HashMap的查找逻辑基本一致,先找到需要查找的key的哈希值对应table数组的哪个槽,然后遍历该槽的链表,查找是否存在该key的结点。

与HashMap不同的地方主要有两点:

1.ConcurrentHashMap中读取或写入槽链表头结点都要专门的方法(tabAt,casTabAt,setTabAt),它们在底层都是调用了UnSafe方法来读写volatile对象,保证了并发安全。    

2.当槽中为特殊结点时(哈希值为负),调用了Node的find方法来查找结点,而特殊结点都会重写这个find方法,实现各自的查找逻辑。这里分为两种情况:

头结点是一个TreeNode,则线程调用findTreeNode方法遍历红黑树查找结点。


头结点是一个ForwardingNode,则表示当前的哈希表正在扩容,并且该槽中的所以结点已经移植到了nextTab中。ForwardingNode中重写的find方法会在nextTab中查找该结点。


put函数:

与HashMap不同的是,在ConcurrentHashMap中,插入的键值对,键和值都不能为null,否则抛出异常。


put函数的运行过程可分为三步。

第一步,计算key的哈希值。

第二步,进入一个for的死循环,直到成功插入或修改value值后才会跳出循环。

for循环中是四个并列的if/elseif/else语句块,每次循环都只会进入一个if模块。

情况1.table为null,调用initTable进行初始化,初始化成功后进入下次循环。


情况2.key对应的table槽为空,CAS新建Node结点插入槽中,跳出循环。


情况3.key对应table槽的头结点是一个forwardingNode,说明table数组正在扩容,线程进入helpTransfer方法,该方法会调用transfer方法让当前线程先帮忙扩容,扩容结束后进入下次循环继续插入结点。


情况4.key对应的table槽已经有结点。这种情况线程会先获取synchronize锁,插入逻辑与HashMap基本相同,如果该槽是一个红黑树,则将结点插入到红黑树中,如果是链表,则遍历链表,存在key就修改value值,不存在就插入到链表尾端,之后判断是否需要树化链表。

注意,synchronize锁住的对象是槽中的头结点而不是整个HashMap。

第三步,进入addCount函数。

该函数主要做两件事1.将ConcurrentHashMap计数加一,其代码逻辑与LongAdder完全一样。2.如果超过阈值,修改sizeCtl,进入transfer函数扩容。

transfer函数支持多线程执行,外围函数调用此方法的时候,会保证第一个发起数据迁移的线程,nextTab 参数为 null,之后再调用此方法的时候,nextTab 不会为 null。


transfer函数:

该函数用于将原来的 tab 数组的元素迁移到新的 nextTab 数组中。transfer函数是以tab数组中的一个槽作为任务单位去迁移的,假设原数组长度为 n,就有 n 个迁移任务。每次线程只负责迁移一部分任务:第一个发起数据迁移的线程会将 transferIndex 指向原数组最后的位置,然后从后往前的 stride 个任务属于第一个线程,然后将 transferIndex 指向新的位置,再往前的 stride 个任务属于第二个线程,依此类推。

第一个调用此方法的线程,参数 nextTab 为 null,会先初始化nextTab,将容量翻倍。

之后进入进入for循环,逐个任务地搬迁table,循环可以分为三块逻辑。

第一块用于给线程分配迁移的任务槽区间和确定下一个迁移的任务槽。第一次进入该逻辑,会执行第二个else if语句,CAS更新 transferIndex向前移动stride,然后确定当前线程需要迁移的数组区间,从nextIndex-1到nextIndex-stride。之后每次进入该逻辑,只要区间中的任务还没完成,都是简单地进入第一个if语句,将i减1后跳出循环,执行i对应槽的迁移工作。i会从nextIndex-1遍历到nextIndex-stride。

当线程将分配的任务区间所有槽都迁移完成后,又会进入第二个else if语句,给当前线程分配下一个任务区间,直到transferIndex<=0,也就是所有的槽都迁移完毕或正在被其他线程迁移,执行第一个else if语句,将i置位-1退出循环,使线程进入第二块逻辑。

第二块用于线程退出transfer函数时做收尾工作,只有在所有槽都完成迁移或正在被其他线程迁移时,当前线程无迁移工作可做才会进入这块逻辑。如果所有迁移工作都完成,finish为true。则将nextTab置空,更新新的table和新的sizeCtl,否则,只需要CAS让sizeCtl-1,表示参与transfer的线程数量减少一个即可。

第三块,就是将table[i]中的链表做迁移工作。线程会先获取槽中头结点的 锁,迁移的逻辑和HashMap中相同。完成迁移工作后,table[i]会为空,线程就会在该槽中放置一个forwarding结点,表示该任务槽已经完成迁移。下轮循环中线程遇到forWarding结点,就会将advance置为true,进入第一块逻辑寻找下一个迁移槽。


size函数:

调用了sumCount函数,返回当前哈希表有多少键值对。实现逻辑与LongAdder一样,返回baseCount和counterCells数组所有成员计数的总和。从源码可以看出1.size函数需要计算,不是在常量时间返回。2.函数没有加任何锁,在统计过程中可能有线程并发修改了ConcurrentHashMap,因此返回的结构不一定准确。


总结

ConcurrentHashMap所有结点都用volatile修饰,并且普通结点哈希值都是正数,这是因为设置一些哈希值为负数的特殊结点。

初始化:使用 cas 来保证并发安全,懒惰初始化 table。

树化:当 table.length < 64 时,先尝试扩容,超过 64 时,并且 bin.length > 8 时,会将链表树化,树化过程会用 synchronized 锁住链表头。

put:如果该 bin 尚未创建,只需要使用 cas 创建 bin;如果已经有了,用 synchronized 锁住链表头进行后续 put 操作,元素添加至 bin 的尾部。如果发现结点是 ForwardingNode,会帮助扩容。

get:无锁操作仅需要保证可见性,扩容过程中 get 操作拿到的是 ForwardingNode 它会让 get 操作在新table 进行搜索。

扩容:扩容时以 bin 为单位进行迁移的,迁移一些bin的时候不影响其他没有迁移的bin并发读写。需要对 bin 进行 synchronized,但这时妙的是其它竞争线程也不是无事可做,它们会帮助把其它 bin 进行扩容,ConcurrentHashMap中会为每个线程分配一定数量的bin进行搬迁,搬迁完后会继续分配,直到所有bin都已经被分配完,当前线程才退出扩容逻辑。

size:元素个数保存在 baseCount 中,并发时的个数变动保存在 CounterCell[] 当中。最后统计数量时累加即可。


原子更新

ConcurrentHashMap是相对线程安全的,对于ConcurrentHashMap的更新操作(如下,get-计算-put)不是线程安全的,因为无法保证这三个操作的原子性。

ConcurrentHashMap的错误使用示例,非线程安全

ConcurrentHashMap提供了线程安全的更新操作,如compute,merge函数。


compute函数

 compute 函数接收一个key和一个计算value新值的函数(lambda表达式)作为参数。这个lambda表达式接收key和之前的value作为参数(如果哈希表中没有key,value为为null)。compute函数可以看做是一个加强版的put函数,put是提供定新的value值,compute是提供计算新value的函数。compute函数保证整个新value的计算和更新操作是原子性的。

源码注释:


使用示例:将哈希表中key对应的value加1,如果没有key,就将新的结点value置1。

源码分析:在源码实现上,只需要把put函数源码新value的赋值的地方改成调用计算value函数即可。除此之外的一个区别是,如果key对应的槽为空。compute向槽中插入新结点前,会将放置一个ReservationNode占位。

computeIfPresent,computeIfAbsent是compute函数的变种,只会在传入的key值在哈希表中存在/不存在时才会调用运算函数修改/插入新的value值。


merge函数

merge函数在compute的基础上提供了一个value参数。当哈希表中没有key,merge做插入操作时,使用传入的value值。做修改操作时,才会调用传入的函数。如果传入的函数返回null,则将该结点从哈希表中移除。


ConcurrentHashSet

J.U.C没有提供ConcurrentHashSet,但可以通过ConcurrentHashMap的newKeySet返回一个线程安全的高效的hashSet。同样这实际上是 ConcurrentHashMap<K, Boolean>的一个包装器,所有映射值都为 Boolean.TRUE。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。