Java 源码分析-ConcurrentHashMap(1.8)

  最近总是在分析源码,感觉源码也不是想象上的那么难,今天我来记录一下我对ConcurrentHashMap的理解。这里只敢说记录,不敢说分析,因为ConcurrentHashMap的代码确实有点难以理解,本文不对代码进行死磕,也就是说,可能不会对某一行代码进行死磕,而是对整个ConcurrentHashMap类进行整体的理解。
  本文参考资料:

  1.ConcurrentHashMap源码分析(JDK8版本)
  2. 死磕 Java 并发:J.U.C 之 Java 并发容器:ConcurrentHashMap
  3.深入浅出ConcurrentHashMap1.8
  4.深入分析ConcurrentHashMap1.8的扩容实现
  5.方腾飞、魏鹏、程晓明的《Java 并发编程的艺术》

1.为什么要使用ConcurrentHashMap?

  其实,楼主都觉得这个问题问的非常傻逼,为什么使用ConcurrentHashMap呢?当然是为了线程安全呗。其实这个回答是比较笼统的,这里面还有很多的问题没有涉及到,比如说,使用最多的HashMap在多线程模型下的缺点,传统线程安全的HashTable的问题等等。

(1).线程不安全的HashMap

  我们才学习HashMap的时候,就知道HashMap是线程不安全的,但是不知道HashMap由于多线程会导致什么问题。下面,我们来看一段代码:

public class Demo {
    public static void main(String[] args) {
        final Map<String, String> map = new HashMap<>(2);
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                for(int i = 0; i < 100000; i++){
                    new Thread(new Runnable() {
                        @Override
                        public void run() {
                            map.put(UUID.randomUUID().toString(), "");

                        }
                    }).start();
                }
            }
        });
        t.start();
        try {
            t.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  HashMap在并发执行put操作是会引起死循环,是因为多线程会导致HashMap的Entry链表形成数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry。

(2).效率低下的HashTable

  HashTable使用synchronized关键字来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下。因为当一个线程访问HashTable的同步方法,其他线程也访问HashTable的同步方法,会进入阻塞或者轮询状态。如果线程1在使用put进行元素添加,线程2不但不能使用put方法添加元素也不能使用get方法来获取元素,所以竞争越激烈效率越低。

(3).ConcurrentHashMap

  相较于笨重的HashTable,ConcurrentHashMap就显得非常高效。ConcurrentHashMap降低了锁的粒度,其中在1.7中,设置了Segment数组,来表示不同数据段,在1.8中,取消了Segment数组,进一步降低了锁的粒度。由于本文是分析1.8的ConcurrentHashMap,所以不对1.7的版本过多的解释。

2.ConcurrentHashMap的模型图

  1.8在1.7的基础上进一步的优化,使得ConcurrentHashMap的锁的粒度进一步降低。我们还是先来看看ConcurrentHashMap的结构图:



  我们看到的是,在1.8的内部,维持了一个Node数组,用来存储数据。实际上,ConcurrentHashMap结构与HashMap的结构类似,基本结构都是数组+链表+红黑树。但是在ConcurrentHashMap里面,红黑树在Node数组内部存储的不是一个TreeNode对象,而是一个TreeBin对象,TreeBin内部维持着一个红黑树。

3.ConcurrentHashMap的内部类

  简单的看过ConcurrentHashMap的结构图之后,现在来了解一下ConcurrentHashMap的内部类,这个在后面我们理解源码有一定的帮助。

(1).Node类

  Node类在ConcurrentHashMap的内部类是最基本的类,其中桶里面装的就是Node元素,还有就是TreeBin、TreeNode等都继承于Node类。我们先来看看Node的代码:

    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;

        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }

        public final K getKey()       { return key; }
        public final V getValue()     { return val; }
        public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
        public final String toString(){ return key + "=" + val; }
        public final V setValue(V value) {
            throw new UnsupportedOperationException();
        }

        public final boolean equals(Object o) {
            Object k, v, u; Map.Entry<?,?> e;
            return ((o instanceof Map.Entry) &&
                    (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                    (v = e.getValue()) != null &&
                    (k == key || k.equals(key)) &&
                    (v == (u = val) || v.equals(u)));
        }

        /**
         * Virtualized support for map.get(); overridden in subclasses.
         */
        Node<K,V> find(int h, Object k) {
            Node<K,V> e = this;
            if (k != null) {
                do {
                    K ek;
                    if (e.hash == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        return e;
                } while ((e = e.next) != null);
            }
            return null;
        }
    }

  我们来看一下Node成员变量:

        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;

  hash用来存储该key的hash值,但是这里面有三个状态需要注意:

名称 描述
MOVED -1 hash值为-1的Node,表示当前节点已经被处理过了,这中情况将出现多线程扩容的情况下,后面会详细的解释
TREEBIN -2 hash值为-2的Node,表示当前的节点是TreeBin
RESERVED -3 保留的hash值,用在ReservationNode上面

  key用来存储的key值,val用来存储value值,这个就不用多说。next字段表示下一个Node,这个在出现哈希冲突时,将定位在相同位置上的Node使用链表连接起来。
  我们还需要注意一点是,我们不能够通过调用setValue方法来改变Node的值。

        public final V setValue(V value) {
            throw new UnsupportedOperationException();
        }

(2).TreeNode类和TreeBin类

  TreeNode类表示的是红黑树上的每个节点。当一个链表上的节点数量超过了指定的值,会将这个链表变为红黑树,当然每个节点就转换为TreeNode。不像HashMap,ConcurrentHashMap在桶里面直接存储的不是TreeNode,而是一个TreeBin,在TreeBin内部维护一个红黑树,也就是说TreeNode在TreeBin内部使用的。

(3).ForwardingNode类

  这个是辅助类,通常在扩容时用到。此时如果一个线程在进行扩容操作,另一个线程put一个元素进来,如果看到对应的位置上面是ForwardingNode对象的话,那么就参与扩容的操作队列来。其中ForwardingNode对象来源有两个:1.原来的位置为null,但是此时复制操作已经到当前位置的后面了,会将这个原来的桶的这个位置置为ForwardingNode对象;2.原来位置不为null,但是已经操作过这个位置了。
  我们来来ForwardingNode的源码:

    static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }
        //省略了find方法代码
    }

  在ForwardingNode里面,我们发现了一个nextTable对象,这个对象表示扩容之后的数组,当一个线程访问到了一个ForwardingNode对象,就知道当前正在进行扩容操作,当前这个线程会帮助扩容(扩容分为两步:1.数组容量增大2倍;2.将原数组的元素拷贝到新数组里面去),将原数组的元素复制到这个nextTable里面去。

4.基本成员变量

  我们在分析ConcurrentHashMap的源码时,还是先来看看它基本的成员变量。

变量名 描述
table 桶数组,用来存储Node元素的。默认为null,只在第一次put操作的进行初始化,该数组的长度永远为2的n次方。
nextTable 默认为null,当不为null,表示当前正在进行扩容操作,这个数组就是扩容之后的数组,长度为原数组的两倍。
baseCount 记录map中元素个数,由于是多线程操作,baseCount记录的不准确,所以要结合counterCells 来使用保证记录的正确性。
sizeCtl 表初始化和扩容的控制位。其中,当为-1时,表示当前table数组正在被初始化;当为-N时,表示有N-1个线程在进行扩容操作;当为0(默认值)时,表示当前table还未使用,此时table为null;当为其余正整数时,表示table的容量,默认是table大小的0.75倍,在代码中使用的是(n - (n>>>2))的方式来计算0.75

5.构造方法

  在ConcurrentHashMap里面,我觉得有两个构造方法需要我们关注,一个是无参数的构造方法,这个构造方法非常的简单,是一个空的构造方法,还有一个就是带初始容量的构造方法:

    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }

  在这个构造方法里面最主要的是,初始化sizeCtl。在这里保证sizeCtl必须是2的n次方,所以这里调用tableSizeFor方法的目的就是调整sizeCtl大小为2的n次方。

    private static final int tableSizeFor(int c) {
        int n = c - 1;
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }

  假设给出大小为100,调用这个方法之后,会将大小调整为256。

6.table数组的初始化

  在前面,对table数组的初始化也有所提及。table不会在构造方法里面进行初始化,而是在第一次put操作时,进行初始化。我们来看看:

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //发现table数组尚未初始化,调用initTable方法来对table数组进行初始化
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
       //省略了其余代码
    }

  在调用put方法时,如果发现table尚未被初始化,会调用initTable方法来初始化。我们来看看initTable方法的代码:

    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
             //sizeCtl小于0,表示已经有线程在对table数组进行初始化,现在这个线程要做的就是让出cpu时间,
             //全力支持table初始化
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            //此时有一个线程在对table数组进行是初始化,如果使用CAS算法对sizeCtl字段更新成功,
            //表示当前线程可以对table数组进行初始化。
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

  在initTable方法中,我们发现table数组只能被一个线程初始化。如果一个线程在初始化table数组,会将sizeCtl字段更新为-1,其他线程看到sizeCtl为-1时,就知道当前已经有线程在初始化table数组,他们需要做的是,全力支持那个线程初始化table数组--让出cpu占用时间。
  同时,我们看到当table数组更新成功之后,sizeCtl更新为table数组大小的0.75倍:

sc = n - (n >>> 2);

7.put操作

  我们使用HashMap最多的操作就是put操作和get操作。这里先对put进行分析。我们先总体看一下put的代码:

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

  这里,我先将整个put方法的代码贴出,让大家有一个总体上概念,然后再来一一解释,整个put操作的流程。
  整个put操作分为4种情况:
  1.如果当前table没有被初始化,会先初始化,重新进行put操作(循环操作,保证put成功)。
  2.通过hash计算,定位位置,如果位置上为null,表示可以直接放进去。
  3.如果对应位置的元素已经被标记为MOVED,即Node的hash为MOVED,那么表示当前table数组在进行扩容操作,此时,当前的线程会帮助扩容。
  4.如果通过hash计算之后的位置不为null,表示出现了哈希冲突,此时会锁住当前这个位置上的Node对象,然后将新添加的Node添加这个位置链表或者红黑树上。

(1).hash计算

  put操作的第一步就是hash计算,这里的hash计算,我认为分为两步:
  1.调用spread方法将key的hashcode再次hash。
  2.通过(n - 1) & hash方法来计算该元素定位的位置。
  我们先来看看spread方法:

    static final int spread(int h) {
        return (h ^ (h >>> 16)) & HASH_BITS;
    }

  我先来解释一下>>>运算符的含义:
  >>>的基本操作就是左移,然后高位补0,这里的左移表示连符号位都要跟着左移;而>>只是左移数值位,不移动符号位。
  h ^ (h >>> 16)与运算过程如下图:


  异或操作完成之后,然后跟HASH_BITS取与操作,我认为是为了消除符号位的影响。
  至于(n - 1) & hash计算,其实就是hash % n的计算,具体的解释,大家可以参考我的Java 源码分析-ThreadLocal,这篇文章里面有相关的解释。

(2).元素放入数组

  由于初始化table数组在前面已经说了,所以这里直接说赋值的情况。在这种情况下,还有分为两种情况:1.对应位置为null;2.对应位置不为null。这里将将两种情况分别讨论一下。

A.对应位置为null

  这种情况比较简单,直接调用了casTabAt方法进行赋值:

    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

  这里使用Unsafe类来对变量进行CAS更新,具体的实现原理,楼主也不是很清楚,实在抱歉!

A.对应位置不为null

  这种情况下,就比较复杂了,我们先来看看代码:

                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }

  这种情况下,新添加的Node肯定会添加到这个位置后面,当然为了保证线程安全,肯定锁住当前的Node,这样将锁的粒度降到了每个Node上,一个Node的put操作不会影响其他Node的操作。
  锁住了之后,当然就要添加Node。这里分为两种情况,一种原来的Node不是TreeBin,也就是说原来就是一个链表,这样直接添加到链表的内部就行了;还有就是原来的Node本身就是一个TreeBin,那么我们通过调用TreeBin内部的方法来添加一个Node,至于红黑树的平衡性,让TreeBin自己来维持,我们外部不需要关注。
  添加完成之后,还需要一个操作,那就是如果当前链表达到了阈值,需要将链表变为红黑树。这一步的解释在后面在详细的说。

8.更新baseCount

  在putVal方法的循环执行完毕之后,一个Node肯定放入进去了,此时就需要调用addCount方法来更新baseCount变量:

    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        //更新baseCount
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
        //检测是否需要扩容
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }

   addCount方法里面分为主要两个操作:1.更新baseCount;2.检测是否扩容。这里由于在讨论更新baseCount,所以不对扩容操作进行详细的解释,之后会做详细的解释。
   更新baseCount的操作分成了两步:1.尝试更新baseCount变量;2.如果更新失败,或者counterCells为null会调用fullAddCount方法进行循环更新。
  我们来看看fullAddCount的代码:

    private final void fullAddCount(long x, boolean wasUncontended) {
        int h;
        if ((h = ThreadLocalRandom.getProbe()) == 0) {
            ThreadLocalRandom.localInit();      // force initialization
            h = ThreadLocalRandom.getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            CounterCell[] as; CounterCell a; int n; long v;
            if ((as = counterCells) != null && (n = as.length) > 0) {
               // 如果counterCells数组对应位置上为null,创建一个cell,
               //放在这个位置上。
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {            // Try to attach new Cell
                        CounterCell r = new CounterCell(x); // Optimistic create
                        if (cellsBusy == 0 &&
                            U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                            boolean created = false;
                            try {               // Recheck under lock
                                CounterCell[] rs; int m, j;
                                if ((rs = counterCells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
               //如果对应位置上不为null,尝试更新value值
                else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                    break;
                else if (counterCells != as || n >= NCPU)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                 //如果对应不为null,并且更新失败,表示此时counterCells数组的容量过小,
                //此时需要扩容。
                else if (cellsBusy == 0 &&
                         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                    try {
                        if (counterCells == as) {// Expand table unless stale
                            CounterCell[] rs = new CounterCell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            counterCells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = ThreadLocalRandom.advanceProbe(h);
            }
            //初始化counterCells数组
            else if (cellsBusy == 0 && counterCells == as &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                boolean init = false;
                try {                           // Initialize table
                    if (counterCells == as) {
                        CounterCell[] rs = new CounterCell[2];
                        rs[h & 1] = new CounterCell(x);
                        counterCells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
                break;                          // Fall back on using base
        }
    }

  这个方法的代码比较长,理解起来可能麻烦,我来说明整个方法的作用和执行的过程。
  在解释这个方法之前,先解释一下baseCount和counterCells含义。两个都是用记录元素个数,只是记录的时机不同的。当要更新元素个数时,优先更新baseCount,如果baseCount更新成功的话,表示更新元素个数的操作已经完成了;如果更新失败的话,此时会考虑更新counterCells数组中某一个(随机的)cell的value值。因此,map的元素个数 = baseCount + 所有的cell的value值
  在调用fullAddCount方法之前,在addCount方法里面进行了两次尝试:1.尝试更新baseCount;2.尝试更新counterCells数组随机位置上的一个cell的value。如果这两次尝试都失败的话,则需要调用fullAddCount来保证元素个数正确的更新。
  而这个fullAddCount方法的作用就是更新cell的value值。
  整个fullAddCount方法的执行步骤:
  1.如果counterCells为null,先初始化counterCells数组,默认大小为0。
  2.如果counterCells不为null,根据产生的随机,通过(n - 1) & h找到一个位置,如果这个位置为null的话,创建一个cell对象放在这个位置上面。
  3.如果这个位置上面不为null,首先会尝试更新这个cell的value值,如果更新成功的话,表示更新操作完成,否则执行第4步。
  4.如果第3步的更新操作失败,表示此时counterCells数组的容量可能不足以应付这么多的线程了。所以此时counterCells数组需要扩容。

9.扩容操作

  当我们put一个元素之后,如果此时元素已经达到了扩容的阈值了,就需要扩容。我们先来看看addCount方里面的扩容部分:

        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                //表示已经有线程在进行扩容操作
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                //当前线程是唯一的或是第一个发起扩容的线程  此时nextTable=null
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }

  addcount方法扩容部分有两种情况:1. sc<0情况下,表示当前已经有线程在进行扩容操作了,此时当前这个线程需要参与帮助扩容的任务中来;2. 另一个情况,当前线程只有一个线程准备扩容操作。第一种情况为什么不调用helperTransfer方法来表示帮助扩容的含义,这个原因,我也不是很懂,但是比较了第一种情况的代码与helperTransfer方法的代码,两者其实比较类似。
  现在我们来看看transfer方法究竟为我们做了什么。由于transfer方法代码过长,所以这里不完全展出来,这里使用分段形式来解释。
  首先,如果是第一个线程调用transfer方法进行扩容操作,那么nextTable肯定为null。所以transfer方法的第一步就是创建新数组,新数组的容量是旧数组的两倍:

        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }

  扩容之后,此时需要将原来的数组进行复制操作了。这个过程支持多线程操作。在看这个过程之前,我们先来看看几个变量的含义。

变量名 类型 描述
fwd ForwardingNode 用来作为占位符,表示当前位置已经被处理过
advance boolean advance为true,表示当前这个节点已经被处理,这个与fwd区别在于,advance为false表示当前节点(i位置)可以被处理
finishing boolean finishing为true,表示当前扩容操作已经操作完毕

  接下来,我们来分析一下transfer方法的执行过程。大体的流程是,遍历这个原来的桶数组,然后复制到新数组里面去。
  首先,必须找到一个可以被处理的Node节点,这个节点可以为null,也可以不为null,但是不能为fwd对象,因为如果是fwd对象,表示当前已经被处理过,没必要再去处理。

           while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }

  上面的代码中,只要advance为false,就退出while循环,表示当前尝试着处理这个位置上的Node。这里说的是尝试着处理,意思就是,可能不会被处理,比如说,已经有现在在处理了;或者这个位置已经被处理,已经为fwd了。
  首先,如果这个位置上为null的话,那么直接将这个位置设置成fwd。

            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);

  如果当前这个节点已经被处理了,直接将advance设置为true,进行下一次位置的选择。

            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed

  其他情况下,表示可以处理这个节点,这里处理的意思,是可以将这个位置上的所有节点拷贝到新数组里面去。这里分为两种情况,我们来一一的分析。
  首先,第一种情况就是节点本身为一个链表结构。

                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            //将原来链表上的一部分就放在原位置
                            setTabAt(nextTab, i, ln);
                            //将原来链表上的另一部分放在 i + n的位置上
                            setTabAt(nextTab, i + n, hn);
                            //处理完毕,将这个位置上设置为fwd,后面的线程看到之后,知道这个节点已经被处理了
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }

  整个过程是非常简单,就是将原来的链表分为了两部分,一部分在原来的 i 位置上,一部分在 i+ n的位置上。这里涉及到了喜闻乐见的反转链表,这里不对反转链表做解释,后面会详细的解释反转链表的原因,到时候会回来详细的解释这段代码。
  现在来看看复制操作的第二种类型。如果当前位置上的Node为TreeBin类型,表示为红黑树,此时要做的操作跟链表的操作也是非常类似,将树分为两个部分,一个部分在原来的i位置上,另一个部分在i + n的位置上。

                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            //如果此时节点数量小于阈值,那么将红黑树变为链表
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;

                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;

  最后,如果所有的节点被赋值完毕之后,表示扩容操作已经完成了,此时finishing为true。

            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {
                    //将nextTable置为null,以便下次扩容
                    nextTable = null;
                    table = nextTab;
                   //将sizeCtl的值调整为长度的0.75
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                 //sizeCtl减一,表示当前有个线程参与扩容
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }

10.反转链表的原因

  楼主的思路来自于深入分析ConcurrentHashMap1.8的扩容实现,如果大家可以参考一下。刚刚,我们对transfer方法的复制操作有了一个大概的理解,这里将对其详细分析。
  在解释这段代码之前,我们先对这段代码里面几个变量有一个初步的理解。

变量名 类型 描述
runBit int 位置为每个节点的hash值与n进行与操作,经过第一次循环,runBit记录的是最后一个hash值变化的Node的hash值。可能这里拗口,待会举一个详细例子。
lastRun Node 默认值为当前位置第一个Node,经过第一次循环,lastRun记录的是最后一个hash值变化的Node。
ln Node 默认值为null,是放在新数组 i 位置上的链表头结点。
hn Node 默认值为null,是放在新数组 i + n 位置上的链表头结点

  在上面的表格中,提到最后一个hash值变化的Node,这句话可能比较拗口,这里举一个简单的例子来表达意思。



  上图中是一个链表的模型,其中,我将这个链表中的节点分为两类:1.一类是节点的hash值相同的;2.第二类是跟第一类的hash值不同的。所以,从这个链表中我们可以得到,lastRun为节点6,runBit也是节点6与n进行与操作得到的值。为什么是节点6呢?因为在6之后,节点类型没有再变了。

                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            //将原来链表上的一部分就放在原位置
                            setTabAt(nextTab, i, ln);
                            //将原来链表上的另一部分放在 i + n的位置上
                            setTabAt(nextTab, i + n, hn);
                            //处理完毕,将这个位置上设置为fwd,后面的线程看到之后,知道这个节点已经被处理了
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }

  结合源码来分析,runBit默认为节点1的hash和n与操作的值。后面依次记录与上一次runBit的不同值,最后通过runBit == 0将所有的节点分为了两类。
  现在我们结合上面的图做一个假设。我们假设紫色节点的hash值&n为0,那么蓝色节点的hash值&n就不为0。经过第一次循环,得出 ln = lastRun = 节点6,hn = null。
  此时我们来分析第二次循环,先来看看紫色节点构成ln链表的过程。


  我看到ln链表只是部分反转了,默认部分是没有反转了。其中得到的ln链表放在新数组的i位置上。
  然后我们再来看看hn链表的构成过程。

  我们发现hn链表是完全反转的。从而,我们得出一个结论,哪个链表默认为null,那个链表就是完全反转的
  最终,我们终于知道了,一个反转链表得出的原因,其实不是故意为之,而是加快复制操作速度,因为lastRun之后的节点进行复制操作不需要创建新节点。

11.get操作

  前面详细的说了put操作和扩容操作,这里简单的分析一下get操作,get操作相比于前面的操作就非常的简单。先来看看源码:

    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
             //匹配成功
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
           //当节点为TreeBin或者ForwardingNode
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
           //遍历链表
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

  整个get操作分为三种情况:1.table数组指定位置的第一个节点就匹配成功,直接返回;2.table数组指定位置上的hash值小于0,此时当前位置可能已经被其他在扩容时处理过,或者当前位置的Node为一个TreeBin,不管是那种类型的Node,调用的都是find方法来获取Node节点;3.其余情况下,直接遍历链表查找。

12.size操作

  size操作里面,主要有两个方法提供,一个是传统的size方法,一个是ConcurrentHashMap比较提倡的mappingCount方法。我们先来看看size方法:

    public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }

  size方法里面调用sumCount方法,sumCount方法才是真正计算元素个数的方法。我们来看看sumCount方法:

    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

  可能大家对这个方法计算方式不太好理解,其实这种计算方式取决于前面的设计方式。在前面我说到过,map的元素个数 = baseCount + 所有的cell的value值。这个结论是因为多线程更新个数时,如果只在baseCount里面记录,一个原因是多线程更新可能会出错,另一个原因是竞争性太大了。所以,设计理念就变成了,当baseCount更新失败时,表示baseCount变量当前忙碌,可以将这个增量添加到一个不忙的cell里面去,这样竞争性就降低了。
  我们再来看看mappingCount方法:

    public long mappingCount() {
        long n = sumCount();
        return (n < 0L) ? 0L : n; // ignore transient negative values
    }

  达到的效果感觉跟size都差不多,只不过是一个返回int,一个返回long而已。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,099评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,828评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,540评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,848评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,971评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,132评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,193评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,934评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,376评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,687评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,846评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,537评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,175评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,887评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,134评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,674评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,741评论 2 351

推荐阅读更多精彩内容