为使笔记尽量简洁, 省略部分非必要代码
一, 常量
// 默认初始容量 16
static final int DEFAULT_INITIAL_CAPACITY = 16;
// 默认加载因子 0.75
static final float DEFAULT_LOAD_FACTOR = 0.75f;
// 默认并发级别 16 (范围在2的1次方到16次方)
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 获取锁前的重试次数 2 ??作用未知
static final int RETRIES_BEFORE_LOCK = 2;
二, 变量
final int segmentMask;
final int segmentShift;
final Segment<K,V>[] segments;
三,内部类
1, segment 分段
static final class Segment<K,V> extends ReentrantLock implements Serializable {
// table的大小表示着当前分段的容量, 类似HashMap中的数组
transient volatile HashEntry<K,V>[] table;
transient int count;
transient int modCount;
// 下次扩容阀值 = table的大小 * 加载因子
transient int threshold;
// 加载因子
final float loadFactor;
... 方法略 ...
}
2, HashEntry
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
/**
* 设置下一节点
*/
final void setNext(HashEntry<K,V> n) { 略 }
... 方法略 ...
}
CurrentHashMap里面具有一个分段的数组, 每个分段又有一个键值对的数组, 大致结构如下图:
二, 主要方法
1, 初始化
/**
* 参数含义: 容量, 加载因子, 并发等级
*/
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
-- 参数校验 略
// 1, 按并发等级计算segment数组的大小 : 2的n次方且不小于并发等级
// Find power-of-two sizes best matching arguments
int sshift = 0; // 偏移次数
int ssize = 1; // 数组大小
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// 存放值时,根据这2个参数获取坐标
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
// 2, 根据segment大小和参数中的容量, 计算每个分段内HashEntry数组的大小
// 容量也是2的n次方,且所有数组容量的和要不小于参数中的容量
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity) { ++c;}
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c) {cap <<= 1;}
// 3, 新建segment数组, 初始化第0位
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
总结: 构造方法主要是新建了分段数组segments, 并初始化数组的第0位对象.
并发等级影响分段数组长度, 容量和并发等级决定每个分段中
2, 放入元素
ConcurrentHashMap 的key和value都不能为null值 !!!!
public V put(K key, V value) {
Segment<K,V> s;
// 不能存放null值
if (value == null) throw new NullPointerException();
// 计算坐标
int hash = hash(key);
// 取出hash的前面几位,与mask取并集,防止角标越界
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
// 初始化对应的分段 会按照第0个分段参数(容量,加载因子)进行初始化
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
// 不懂
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
// 按照第0个元素进行初始化
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
// 注意: 此处保证了操作的原子性CAS
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
segment内部类方法
a, 放入值
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
// 所在数组对应坐标的元素
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
// 找到对应key的键值对, 表示原先有值, 则替换值
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
// 一层一层往下找
e = e.next;
} else {
// 此时表示table的对应位置没有值
if (node != null) {
node.setNext(first);
} else {
node = new HashEntry<K,V>(hash, key, value, first);
}
int c = count + 1;
// 数量增加,判断是否达到扩容值
// 1.7的concurrentHashMap 在容量达到扩容值后, 下一次新增扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY){
rehash(node);
} else {
setEntryAt(tab, index, node);
}
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
// 释放锁
unlock();
}
return oldValue;
}
b, 扫描并加锁
// 不断的尝试获取锁, 如果没有key对应, 则返回新建节点, 有则返回null
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
// 查找key对应的值, 没有则新建HashEntry
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
// 达到最大值后, 直接获取锁,没拿到会阻塞, 直到拿到为止
lock();
break;
}
// 没有达到最大值前, 每2次判断值是否改变了??为啥不是每次??
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
c, 扩容
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
// 容量扩充为原来的2倍
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
// 旧元素在新table中的坐标
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
// 单个节点的直接赋值
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
// 多个节点的,遍历出末尾节点
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
// 将末尾节点赋值到新坐标
newTable[lastIdx] = lastRun;
// Clone remaining nodes
// 依次克隆剩下节点
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
// 为啥每次都要计算坐标, 不是应该都一样吗???
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
// 添加参数中的新节点
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
扩容元素位置的变化, 有待验证~~
三, 总结
jdk1.7的ConcurrentHashMap 主要采用分段来进行存储数据,每个分段继承了ReentrantLock锁接口, 单独进行加锁, 所以效率会比较高.