concurrenthashmap

concurrenthashmap1.7 1.8差别

  • 1.7采用segment分段锁和hashentry数组的形式,将数据一段一段的存储,并且给每一段数据分配锁,当访问某一个数据资源时只会锁住当前segment下的数据。
    concurrenthashmap定位一个元素时,要经过两次哈希,第一次哈希定位到segment分段锁,第二次哈希定位到链表的头结点。
  • 1.8
  1. concurrenthashmap采用了和hashmap一样的形式,数组+链表/红黑树的数据结构。
  2. 放弃了segment分段锁这样的操作,使用node数组保存键值对,底层使用CAS操作 采用unsafe方法来保证获取数据的安全性,另外在添加节点数据时会采用synchronized关键字来加锁。
  3. 1.8 使用了一些volitale关键字修饰的变量来进行操作,比如sizeCtl变量,这个变量用来判断是否有线程在进行初始化操作,当为-1的时候说明有线程正在初始化,当前线程放弃初始化操作。另外还有一个MOVED变量来判断当前数组是否在被扩容,如果为-1情况下,说明当前正在对这个数组进行扩容,当前线程也会参与进去帮助复制。

1.7

concurrenthashmap.png

1.7采用segment分段锁以及hashentry数组的形式。将数据分成一段一段存储,并且给每一段数据分配一个锁。当访问某一个数据时,只会锁住当前segment下的数据。
因此concurrenthashmap定位一个元素时,需要两次哈希。第一次哈希定位到segment分段锁,第二次哈希定位到链表的头结点。

1.8

jdk1.8 concurrenthashmap 参考了hashmap的实现,采用了数组加链表/红黑树的结构。
1.8中放弃了分段锁这样的机制,采用Node数组来保存键值对+CAS操作:采用unsafe方法保证数据的获取+Synchronized关键字对数据来进行加锁。
node中保存着hash值,key值与value值。

        Node(int hash, K key, V val) {
            this.hash = hash;
            this.key = key;
            this.val = val;
        }

        Node(int hash, K key, V val, Node<K,V> next) {
            this(hash, key, val);
            this.next = next;
        }

sizeCtl:

  • -1表示初始化
  • -(1+n) n:表示活动的扩张线程
/**
     * 用来控制表初始化和扩容的,默认值为0,当在初始化的时候指定了大小,这会将这个大小保存在sizeCtl中,大小为数组的0.75
     * 当为负的时候,说明表正在初始化或扩张,
     *     -1表示初始化
     *     -(1+n) n:表示活动的扩张线程
     */
    private transient volatile int sizeCtl;

构造方法

node数组只有在第一次添加元素的时候才会初始化,否则只是初始化一个ConcurrentHashMap对象的话,只是设定了一个sizeCtl变量,这个变量用来判断对象的一些状态和是否需要扩容.在开始初始化的时候,首先判断sizeCtl的值,如果sizeCtl < 0,说明有线程在初始化,当前线程便放弃初始化操作。否则,将SIZECTL设置为-1,Hash表进行初始化。初始化成功以后,将sizeCtl的值设置为当前的容量值。

    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
    }

//当出入一个Map的时候,先设定sizeCtl为默认容量,在添加元素
public ConcurrentHashMapDebug(Map<? extends K, ? extends V> m) {
        this.sizeCtl = DEFAULT_CAPACITY;
        putAll(m);
    }
 /**
     * Initializes table, using the size recorded in sizeCtl.
     */
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            //sizeCtl小于0,正在初始化
            if ((sc = sizeCtl) < 0)
                //调用yield()函数,使线程让出CPU资源
                Thread.yield(); // lost initialization race; just spin
            //设置SIZECTL为-1,表示正在初始化
            else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2); //n-(1/4)n,即默认的容量(n * loadFactor)
                    }
                } finally {
                    sizeCtl = sc; //重新设置sizeCtl
                }
                break;
            }
        }
        return tab;
    }

put 过程

  • 添加键值对时首先会判断node数组是否初始化,
  • 如果没有的话就初始化数组
  • 然后通过计算hash值来确定放在数组的哪个位置(hash值计算和hashmap一样)
  • 如果这个位置为空则直接添加,如果不为空的话,则取出这个节点来
  • 如果取出来的节点的hash值是MOVED(-1)的话,则表示当前正在对这个数组进行扩容,复制到新的数组,则当前线程也去帮助复制
  • 最后一种情况就是,如果这个节点,不为空,也不在扩容,则通过synchronized来加锁,进行添加操作

扩容机制

第一次添加元素的时候,默认初期长度为16,当往map中继续添加元素的时候,通过hash值跟数组长度取与来决定放在数组的哪个位置,如果出现放在同一个位置的时候,优先以链表的形式存放,在同一个位置的个数又达到了8个以上,如果数组的长度还小于64的时候,则会扩容数组。如果数组的长度大于等于64了的话,在会将该节点的链表转换成树。

当有线程进行put操作时,如果正在进行扩容,可以通过helpTransfer()方法加入扩容。也就是说,ConcurrentHashMap支持多线程扩容,多个线程处理不同的节点。

  1. 如果新增节点之后,所在链表的元素个数达到了阈值 8,则会把链表转换成红黑树,如果当前长度小于64则会进行扩容。
  2. 在每次添加完元素的addCount方法中,也会判断当前数组中的元素是否达到了sizeCtl的量,如果达到了的话,则会进入transfer方法去扩容

get 方法

通过键值的hash计算索引位置,如果满足条件,直接返回对应的值;

如果相应节点的hash值小于0 ,即该节点在进行扩容,直接在调用ForwardingNodes节点的find方法进行查找。

否则,遍历当前节点直到找到对应的元素。

调用transfer实现扩容

·tryPresize是在treeIfybin和putAll方法中调用,treeIfybin主要是在put添加元素完之后,判断该数组节点相关元素是不是已经超过8个的时候,如果超过则会调用这个方法来扩容数组或者把链表转为树。

·helpTransfer是在当一个线程要对table中元素进行操作的时候,如果检测到节点的HASH值为MOVED的时候,就会调用helpTransfer方法,在helpTransfer中再调用transfer方法来帮助完成数组的扩容

·addCount是在当对数组进行操作,使得数组中存储的元素个数发生了变化的时候会调用的方法。

在扩容的时候,可以不可以对数组进行读写操作呢?

事实上是可以的。当在进行数组扩容的时候,如果当前节点还没有被处理(也就是说还没有设置为fwd节点),那就可以进行设置操作。

如果该节点已经被处理了,则当前线程也会加入到扩容的操作中去。

多个线程又是如何同步处理的呢

在ConcurrentHashMap中,同步处理主要是通过Synchronized和unsafe CAS操作来完成的。

  • 在取得sizeCtl、某个位置的Node的时候,使用的都是unsafe CAS的方法,来达到并发安全的目的
  • 在某个位置添加节点时采用synchronized来加锁
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。