ConcurrentHashMap源码分析JDK1.7&1.8

一、背景

​ 在JDK1.5版本中引入了一个线程安全高性能Map类—ConcurrentHashMap,主要为了解决HashMap线程不安全和Hashtable效率不高的问题。众所周知,HashMap在多线程编程中是线程不安全的,而Hashtable由于使用了synchronized修饰方法而导致执行效率不高;因此,ConcurrentHashMap的出现是为了在保证线程安全的前提下,提高性能。

二、设计猜想

​ 在探究源码之前,首先我们先尝试用自己的逻辑来设计下ConcurrentHashMap<K,V>的数据结构。在我们熟悉的数据结构中有 一种hash表的数据结构符合ConcurrentHashMap的存储要求

[散列表](Hash table,也叫哈希表),是根据关键码值(Key value)而直接进行访问的数据结构。也就是 说,它通过把关键码值映射到表中一个位置来访问记录,以加快查找的速度。这个映射函数叫做散列函数, 存放记录的数组叫做散列表。

在设计ConcurrentHashMap中需要关注的几个方面:

  • 由于数组大小、类型一旦确定,编译、运行时都不可修改,必须对ConcurrentHashMap的数组动态扩容。
  • 通过Key计算得到的Hash值,必须保证元素都能正确的被保存,需要考虑hash冲突的处理。

三、设计思路

Hashtable效率不高的主要原因是容器操作过程中对整个对象进行加锁,其他线程的任何操作都会被阻塞。解决方式就是从“减少锁的范围”、“引入非阻塞锁”或引入”读写锁“等方面进行优化,针对以上的思路可以进行如下改进:

  1. Hashtable整张表拆分成一张张的小表,操作中只有某个小表受影响,而其他小表可以实现高性能操作。
  2. 小表可以实现非阻塞锁的”读写“锁,在操作小表时,可以减少其他线程的等待时间。

我们暂称以上的设计为“分段锁”技术。

Map的底层数据结构是数组+链表,基于上面的拆表思路,将上面的小表替换成链表的头节点即数组的某个元素。基于以上的方案,可以降低方案一的复杂度和空间使用率。

JDK中提供的ConcurrentHashMap在1.7和1.8的版本中正是基于这个改进思路进行设计的。

四、分段锁技术

ConcurrentHashMap通过持有一个Segment[]数组对象来实现锁分段技术,其中每一个Segment对象通过继承ReentrantLock实现持有非阻塞的重入锁。实则一个Segment对象可以理解为一个HashMap,不同在于Segment持有锁,而HashMap不持有锁。Segment中持有一个HashEntry[]数组,HashEntry是数据节点对象。

ConcurrentHashMap结构如下图:

img

4.1 名词术语

字段 说明 备注
concurrencyLevel 并发级别
segmentShift 段偏移量
segmentMask 段掩码

4.2 源码剖析

4.2.1 容器初始化

​ 容器初始化主要是为完成Segment[]数组对象的初始化,Segment[]数组的大小又依赖于ssize的值,ssize是通过concurrencyLevel计算得到,其中ssize应该满足2的N次方(power-of-two sizes),所以ssize >=concurrencyLevel,此处对于Segment[]数组的大小的约束是为了通过按位与的散列算法来定位Segments数组的索引。(以下省略无关代码)

public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
  int ssize = 1;
  while (ssize < concurrencyLevel)
    ssize <<= 1;
  Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
}

4.2.2 设置值

public V put(K key, V value) {
  Segment<K,V> s;
  if (value == null)
  throw new NullPointerException();
  int hash = hash(key);
  int j = (hash >>> segmentShift) & segmentMask;
  if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
  (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
  s = ensureSegment(j);
  return s.put(key, hash, value, false);
}
  • ConcurrentHashMap的value不能为空,否则抛出NPE异常。
  • 获取key相应的hash值,用于计算操作元素所在的segment[]数组的索引下标j。
  • 通过索引拿到相应的segment对象,进行put操作
计算Key的hash值
private int hash(Object k) {
  int h = hashSeed;
  if ((0 != h) && (k instanceof String))
    return sun.misc.Hashing.stringHash32((String) k);
  h ^= k.hashCode();
  h += (h <<  15) ^ 0xffffcd7d;
  h ^= (h >>> 10);
  h += (h << 3);
  h ^= (h >>> 6);
  h += (h << 2) + (h << 14);
  return h ^ (h >>> 16);
}

ConcurrentHashMap中对key的处理采用的是变体的Wang/Jenkins哈希算法。此算法特点:雪崩性和可逆性。

计算Segment数组下标
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
  int sshift = 0,ssize = 1;
  while (ssize < concurrencyLevel){
    ++sshift;
    ssize <<= 1;
  }
  this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;
}

public V put(K key, V value) {
int j = (hash >>> segmentShift) & segmentMask;
}

j指待操作的Segment[]数组的下标,而其中的segmentShiftsegmentMask是容器初始化时完成的赋值。其中的ssize等于Segment[]数组的长度,而sshift满足2^sshif=ssize公式。下标j的值域范围为[0,ssize-1],由于ssize=2^sshif,那么下标j可以用1个sshift位的二进制数字表示。假如:ssize为16,那么sshift=4,j的值域为[0,15],而[0000b,1111b]就是j的值域;则求key在Segment[]的下标j,就是求key对应的一个散列的4位二进制数值。而ConcurrentHashMap的源码求下标j的方式非常简单,就是取key的hash值的高4位。因此,求key散列到长度为ssize的Segment数组的下标j,就是求key的hash值的高sshift位。

1538405541238
初始化Segment对象
private Segment<K,V> ensureSegment(int k) {
  final Segment<K,V>[] ss = this.segments;
  long u = (k << SSHIFT) + SBASE; // raw offset
  Segment<K,V> seg;
  if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
    Segment<K,V> proto = ss[0]; // use segment 0 as prototype
    int cap = proto.table.length;
    float lf = proto.loadFactor;
    int threshold = (int)(cap * lf);
    HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) { // recheck
      Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
      while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) {
        if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
          break;
      }
    }
  }
  return seg;
}
  • 利用UNSAFE原子操作从Segment[]数组中尝试获取对应Segment对象,没有则进行初始化,有则直接返回。
  • Segment对象的初始化以Segment[]数组中头节点的参数和UNSAFE原子操作完成初始化。
存放元素
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
  //尝试获取Segment的Lock,获取成功返回null,否则非阻塞重试3次,超过3次则阻塞等待锁,返回对应的链表节点。   
  HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);
  V oldValue;
  try {
    HashEntry<K,V>[] tab = table;
    int index = (tab.length - 1) & hash;
    HashEntry<K,V> first = entryAt(tab, index);
    for (HashEntry<K,V> e = first;;) {
      if (e != null) {
        K k;
        if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) {
          oldValue = e.value;
          if (!onlyIfAbsent) {
            e.value = value;
            ++modCount;
          }
          break;
        }
        //递归从链表头切换到后续节点
        e = e.next;
      }
      else {
        if (node != null)
          node.setNext(first);
        else
          node = new HashEntry<K,V>(hash, key, value, first);
        int c = count + 1;
        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
          rehash(node);
        else
          setEntryAt(tab, index, node);
        ++modCount;
        count = c;
        oldValue = null;
        break;
      }
    }
  } finally {
    unlock();
  }
  return oldValue;
}
  1. 首先尝试获取Segment的Lock,Segment持有的是ReentrantLock非阻塞锁,如果失败则重试获取
  2. 获取链表的头结点,Segment持有的HashEntry[]数组操作的是头结点开始:
    1. 如果头节点e不为空,如果key相同,则更新值,否则执行e = e.next进行链表遍历
    2. 如果头节点为空,则新增节点node,满足node.next=first

4.2.3 容器扩容

private void rehash(HashEntry<K,V> node) {
  HashEntry<K,V>[] oldTable = table;
  int oldCapacity = oldTable.length;
  int newCapacity = oldCapacity << 1;
  threshold = (int)(newCapacity * loadFactor);
  HashEntry<K,V>[] newTable =
    (HashEntry<K,V>[]) new HashEntry[newCapacity];
  int sizeMask = newCapacity - 1;
  for (int i = 0; i < oldCapacity ; i++) {
    HashEntry<K,V> e = oldTable[i];
    if (e != null) {
      HashEntry<K,V> next = e.next;
      int idx = e.hash & sizeMask;
      if (next == null)   //  Single node on list
        newTable[idx] = e;
      else { // Reuse consecutive sequence at same slot
        HashEntry<K,V> lastRun = e;
        int lastIdx = idx;
        for (HashEntry<K,V> last = next;
             last != null;
             last = last.next) {
          int k = last.hash & sizeMask;
          if (k != lastIdx) {
            lastIdx = k;
            lastRun = last;
          }
        }
        newTable[lastIdx] = lastRun;
        // Clone remaining nodes
        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
          V v = p.value;
          int h = p.hash;
          int k = h & sizeMask;
          HashEntry<K,V> n = newTable[k];
          newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
        }
      }
    }
  }
  int nodeIndex = node.hash & sizeMask; // add the new node
  node.setNext(newTable[nodeIndex]);
  newTable[nodeIndex] = node;
  table = newTable;
}

4.2.4 获取值

public V get(Object key) {
  Segment<K,V> s; // manually integrate access methods to reduce overhead
  HashEntry<K,V>[] tab;
  int h = hash(key);
  long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
  if ((s=(Segment<K,V>)UNSAFE.getObjectVolatile(segments,u))!=null&&(tab = s.table)!=null) {
    for (HashEntry<K,V> e=(HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);e != null; e = e.next) {
      K k;
      if ((k = e.key) == key || (e.hash == h && key.equals(k)))
        return e.value;
    }
  }
  return null;
}
  • 获取key相应的hash值,通过计算得到索引u,从而定位到Segment[]数组中的Segment对象。
  • 在通过计算得到HashEntry[]数组中的HashEntry对象,再通过循环遍历链表,最终定位到数据。

五、行锁技术

5.1 源码剖析

5.2.1 容器初始化

static final float DEFAULT_LOAD_FACTOR = 0.75f;
static final int DEFAULT_INITIAL_CAPACITY = 1 << 4;
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
    ……………………
    this.loadFactor = loadFactor;
  this.threshold = tableSizeFor(initialCapacity);
}

在进行初始化容器时,会设置两个参数,initialCapacity表示容器数组的容量,loadFactor表示容器的负载因子。其中tableSizeFor会对initialCapacity参数值进行二进制的位左移,使得容量是2的N次方。

5.2.2 设置值

计算Key的hash值
static final int spread(int h) {
  return (h ^ (h >>> 16)) & HASH_BITS;
}

其中 hash(key) 方法就是计算key的hash值, (h = key.hashCode()) ^ (h >>> 16) 使用高位16位和低16位异或运算得到Hash值,主要为了使hash分布尽可能的均匀。

[图片上传失败...(image-237caa-1576651270043)]

初始化容器数组
private final Node<K,V>[] initTable() {
  Node<K,V>[] tab; int sc;
  while ((tab = table) == null || tab.length == 0) {
    if ((sc = sizeCtl) < 0)
      Thread.yield(); // lost initialization race; just spin
    else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
      try {
        if ((tab = table) == null || tab.length == 0) {
          int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
          @SuppressWarnings("unchecked")
          Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
          table = tab = nt;
          sc = n - (n >>> 2);
        }
      } finally {
        sizeCtl = sc;
      }
      break;
    }
  }
  return tab;
}
  • 通过sc变量来控制初始化过程,sc=-1在进行容器初始化,其他线程循环等待并让出CPU(Thread.yield())
  • 初始化Node[]数组对象,并返回
存放元素
final V putVal(K key, V value, boolean onlyIfAbsent) {
  for (Node<K,V>[] tab = table;;) {
    Node<K,V> f; int n, i, fh;
    if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
      if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
        break;                   // no lock when adding to empty bin
    }else {
      V oldVal = null;
      synchronized (f) {
        if (tabAt(tab, i) == f) {
          if (fh >= 0) {
            binCount = 1;
            for (Node<K,V> e = f;; ++binCount) {
              K ek;
              if (e.hash == hash &&
                  ((ek = e.key) == key ||
                   (ek != null && key.equals(ek)))) {
                oldVal = e.val;
                if (!onlyIfAbsent)
                  e.val = value;
                break;
              }
              Node<K,V> pred = e;
              if ((e = e.next) == null) {
                pred.next = new Node<K,V>(hash, key,
                                          value, null);
                break;
              }
            }
          }
          else if (f instanceof TreeBin) {
            Node<K,V> p;
            binCount = 2;
            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                  value)) != null) {
              oldVal = p.val;
              if (!onlyIfAbsent)
                p.val = value;
            }
          }
        }
      }
      if (binCount != 0) {
        if (binCount >= TREEIFY_THRESHOLD)
          treeifyBin(tab, i);
        if (oldVal != null)
          return oldVal;
        break;
      }
    }
  }
  addCount(1L, binCount);
  return null;
}
  1. 获取容器中Node[]数组的头结点,如果头结点为空,则构造新节点,并添加到该头结点上(CAS)。
  2. 如果不是头节点,则使用synchronized锁住头节点,循环进行链表尾部添加

5.2.3 容器扩容

https://www.cnblogs.com/menghuantiancheng/p/10462370.html

5.2.4 获取值

public V get(Object key) {
  Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
  int h = spread(key.hashCode());
  if ((tab = table) != null && (n = tab.length) >0&&(e = tabAt(tab, (n - 1) & h)) != null) {
    if ((eh = e.hash) == h) {
      if ((ek = e.key) == key || (ek != null && key.equals(ek)))
        return e.val;
    } else if (eh < 0)
      return (p = e.find(h, key)) != null ? p.val : null;
    while ((e = e.next) != null) {
      if (e.hash == h &&
          ((ek = e.key) == key || (ek != null && key.equals(ek))))
        return e.val;
    }
  }
  return null;
}
  • 获取key相应的hash值,定位到Node[]数组的索引位,依次循环遍历链表,最终定位到数据。

参考文档:

https://www.cnblogs.com/weiweiblog/archive/2018/09/09/9611271.html

https://blog.csdn.net/majinggogogo/article/details/80260400

https://www.cnblogs.com/lonelyJay/p/9736027.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351