前言
前面我们分析了 HashMap 和 ConcurrentHashMap ,都是一种键值对形式的数据存储容器,区别是后者在多线程环境下也能保证数据安全。但是它们的内部元素都是无序的,元素所处桶的位置是通过key的hashcode经过hash与桶容量取模确定的。当然我们也知道TreeMap即是一种有序的键值对容器,它内部是基于红黑树算法实现排序的,而且非线程安全,在多线程环境下使用存在问题。今天我们来了解下基于跳表(SkipList) 实现的线程安全的 ConcurrentSkipListMap。
跳表
传统意义的单链表是一个线性结构,向有序的链表中插入一个节点需要 O(n)的时间,查找操作需要 O(n)的时间。
如果我们把链表中的元素攀升几个元素,下图所示
每间隔一个元素攀升一个,相比于单链表的O(n)可以减少查找所需时间为 O(n/2)。因为我们可以先通过每个节点的最上面的指针先进行查找,这样就能跳过一半的节点。
这就是跳表的基本思路,先从最上层的链表查询,这样直接跳过了大量不必要的元素,在该层碰壁了,无法继续往后走了,再往下层走。每个节点不止有指向后一个节点的指针,还有指向下一层元素的指针,从而加快查找、删除等操作。
Skip list(跳表) 是一种可以代替平衡树的数据结构。让已排序的数据分布在多层链表中,以 0-1 随机数决定一个数据的向上攀升与否,通过“空间来换取时间
”的一个算法,建立多级索引,实现以二分查找遍历一个有序链表,从而提高了效率。
跳转-查询
查找就是给定一个 key,查找这个 key 是否出现在跳跃表中。我们以上图查找 67 为例,查找路线如下图:
查找的过程有点像我们的二分查找,不过这里我们是通过为链表建立多级索引,以空间换时间来实现二分查找。所以,跳表的查询操作的时间复杂度近似O(logN)。
跳表-插入
插入过程包含几个步骤:
- 找到需要插入的位置。
- 插入元素。
- 根据算法: 算出插入的元素是否需要攀升,攀升几层。
-
攀升后的元素,跟随处层的前后元素关联起来。
图示如下。先找到对应位置,并插入
如果攀升了,则把每层的攀升元素和前后元素关联起来
关联时的操作可以使用一个数组,该数组的容量即为层数,查找的时候即可往改数组里赋值,每一层往下跳时就往数组里放入该节点,如果插入的元素攀升到了该层,则该节点即为该攀升元素的前节点,该节点的后节点即为攀升元素的后节点。
跳表-删除
理解了插入,删除也就没什么很忙困难了:
如果next指向的即为目标元素,则删除目标节点,并调整next直线目标节点的next节点
ConcurrentSkipListMap
ConcurrentSkipListMap 提供了一种线程安全的并发访问的排序映射表。内部是 SkipList(跳表)结构实现,在理论上能够在 O(log(n))时间内完成查找、插入、删除操作。
数据结构
ConcurrentSkipListMap 有一个 HeadIndex类型的head
属性,为整个数据的访问起点,HeadIndex集成于 Index,存放的位每层的头索引节点,Index实际也是链表,类似于二叉树,存在下层节点属性down
和后续索引节点right
还有数据节点node
。Node即存实际数据的节点,实际就是个单链表。
public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> implements ConcurrentNavigableMap<K,V>,
Cloneable,java.io.Serializable {
/** Special value used to identify base-level header*/
//该值用于标记底层数据节点的头结点
private static final Object BASE_HEADER = new Object();
/** The topmost head index of the skiplist.*/
// 顶层索引的头节点
private transient volatile HeadIndex<K,V> head;
Node 数据节点
存储数据的节点,最基本的数据存储单元
static final class Node<K,V> {
final K key;
volatile Object value;
volatile Node<K,V> next;
// 删除结点,在结点后面添加一个 marker 结点或者将结点和其后的 marker 结点从其前驱中断开。
void helpDelete(Node<K,V> b, Node<K,V> f) {
/*
* Rechecking links and then doing only one of the
* help-out stages per call tends to minimize CAS
* interference among helping threads.
*/
if (f == next && this == b.next) {
if (f == null || f.value != f) // not already marked
casNext(f, new Node<K,V>(f));
else
b.casNext(this, f.next);
}
}
}
Index 索引节点
static class Index<K,V> {
final Node<K,V> node; // 数据节点
final Index<K,V> down; // 下级level (数据都相同)
volatile Index<K,V> right; //同一level 后继索引节点
// cas 设置当前索引节点的 right 指向为val
final boolean casRight(Index<K,V> cmp, Index<K,V> val) {
// 如果this的rightOffset的地址的值为cmp,则将值改为val
return UNSAFE.compareAndSwapObject(this, rightOffset, cmp, val);
}
// 两个索引节点通过right连接, 然后替换当前索引的right
final boolean link(Index<K,V> succ, Index<K,V> newSucc) {
Node<K,V> n = node;
newSucc.right = succ;
return n.value != null && casRight(succ, newSucc);
}
// 解除当前索引节点和succ的连接,并将当前right的指向succ.right节点
final boolean unlink(Index<K,V> succ) {
return node.value != null && casRight(succ, succ.right);
}
}
HeadIndex 头索引节点
封装了Index,作为每层的头结点,level
属性用于标识当前层次的序号
/**
* Nodes heading each level keep track of their level.
*/
static final class HeadIndex<K,V> extends Index<K,V> {
final int level;
HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
super(node, down, right);
this.level = level;
}
}
下面的图能很清晰的表达出各个节点间的连接关系
构造方法
几个构造方法都是调用了 initialize
()方法
public ConcurrentSkipListMap() {
this.comparator = null;
initialize();
}
public ConcurrentSkipListMap(Comparator<? super K> comparator) {
this.comparator = comparator;
initialize();
}
private void initialize() {
keySet = null;
entrySet = null;
values = null;
descendingMap = null;
head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null),null, null, 1);
}
主要是初始化整个跳表的操作,head指向一个new Object()数据节点;
get 方法
首先根据 key 找到前驱结点,然后从前驱结点开始往后查找,找到与 key 相等的结点,则返回该结点,否则,返回 null。在这个过程中会删除一些已经标记为删除状态的结点
public V get(Object key) {
return doGet(key);
}
private V doGet(Object key) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
// 从左上到右下 查询key的前驱节点 然后从前驱节点往后查找和key相等的节点
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
if (n == null)
break outer;
Node<K,V> f = n.next;
// 发送变化了 退出 重试
if (n != b.next) // inconsistent read
break;
// 被其他线程删除了 则先删除再重试
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n) // b is deleted
break;
if ((c = cpr(cmp, key, n.key)) == 0) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
if (c < 0)
break outer;
b = n;
n = f;
}
}
return null;
}
// 从左上往右下 查询key的前驱节点。 过程中删除打上了删除标记的节点
private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
if (key == null)
throw new NullPointerException(); // don't postpone errors
for (;;) {
// q的头结点 r的q的后续节点
for (Index<K,V> q = head, r = q.right, d;;) {
if (r != null) {
Node<K,V> n = r.node;
K k = n.key;
// value为空的 直接删除
if (n.value == null) {
if (!q.unlink(r))
break; // restart
r = q.right; // reread r
continue;
}
// 继续往后寻找
if (cpr(cmp, key, k) > 0) {
q = r;
r = r.right;
continue;
}
}
// 已经到达底层 并且后面的元素到大于改key 则返回该节点
if ((d = q.down) == null)
return q.node;
// 往下寻找
q = d;
r = d.right;
}
}
}
就是跳表从左上到右下的查找过程。
put 方法
核心方法是doPut()方法。
private V doPut(K key, V value, boolean onlyIfAbsent) {
Node<K,V> z; // added node
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
// b = 前驱数据节点 n=前驱节点是后续节点
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
if (n != null) {
Object v; int c;
Node<K,V> f = n.next;
// 如果先前获取到的n 又不等于后续节点了 重试
if (n != b.next) // inconsistent read
break;
// n的value为空 则删除 重试
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
// b的value 为空 则删除 重试
if (b.value == null || v == n) // b is deleted
break;
// 比较key和n.key的大小
if ((c = cpr(cmp, key, n.key)) > 0) {
b = n;
n = f;
continue;
}
// 如果已存在大小一样的key
if (c == 0) {
// cas 替换value
if (onlyIfAbsent || n.casValue(v, value)) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
// 失败 重试
break; // restart if lost race to replace value
}
// else c < 0; fall through
}
// 新建一个数据节点
z = new Node<K,V>(key, value, n);
// 将前驱节点额后续节点 cas设置为新节点
if (!b.casNext(n, z))
break; // restart if lost race to append to b
break outer;
}
}
// 获取个随机数
int rnd = ThreadLocalRandom.nextSecondarySeed();
// 需要攀升
if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
int level = 1, max;
// 攀升多少层
while (((rnd >>>= 1) & 1) != 0)
++level;
// 新增的索引节点
Index<K,V> idx = null;
HeadIndex<K,V> h = head;
// 如果攀升的层数不大于当前最大层数
if (level <= (max = h.level)) {
// 在每一层创建一个相同数据的索引节点,用down关联起来
for (int i = 1; i <= level; ++i)
idx = new Index<K,V>(z, idx, null);
}
// 超过最大层数 则增加一层
else { // try to grow by one level
level = max + 1; // hold in array and later pick the one to use
@SuppressWarnings("unchecked")Index<K,V>[] idxs =
(Index<K,V>[])new Index<?,?>[level+1];
// 在每一层创建一个相同数据的索引节点,用down关联起来 并放入数组中
for (int i = 1; i <= level; ++i)
idxs[i] = idx = new Index<K,V>(z, idx, null);
for (;;) {
h = head;
int oldLevel = h.level;
// 最大层数已经发生变化了
if (level <= oldLevel) // lost race to add level
break;
// 新的头索引节点
HeadIndex<K,V> newh = h;
Node<K,V> oldbase = h.node;
// 新增的层数 创建每层的头索引节点 并关联起来,后续节点 即为put进来的数据节点
for (int j = oldLevel+1; j <= level; ++j)
newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
// cas 设置新的头索引节点
if (casHead(h, newh)) {
h = newh;
idx = idxs[level = oldLevel];
break;
}
}
}
//给新添加的索引节点 关联前后驱 节点
// find insertion points and splice in
splice: for (int insertionLevel = level;;) {
int j = h.level;
for (Index<K,V> q = h, r = q.right, t = idx;;) {
if (q == null || t == null)
break splice;
// 查找当前层的 前驱节点q 后驱节点r
if (r != null) {
Node<K,V> n = r.node;
// compare before deletion check avoids needing recheck
int c = cpr(cmp, key, n.key);
//如果 n 正在被其他线程删除,那么调用 unlink 去删除它
if (n.value == null) {
if (!q.unlink(r))
break;
// r重新获取q的后驱节点 重新循环
r = q.right;
continue;
}
// 如果大于0 则都往后移一个 继续往后寻找
if (c > 0) {
q = r;
r = r.right;
continue;
}
}
if (j == insertionLevel) {
// 将t插入到q 和 r中 ,同层 失败 重试
if (!q.link(r, t))
break; // restart
// 插入的时候 t被删除了 直接退出
if (t.node.value == null) {
findNode(key);
break splice;
}
// 到底了 退出
if (--insertionLevel == 0)
break splice;
}
// j 和 insertionLevel 应该同步
if (--j >= insertionLevel && j < level)
t = t.down;
// 处理下一层
q = q.down;
r = q.right;
}
}
}
return null;
}
图解可以参考上面的跳表的插入。
理解了过程其实还是挺简单的,只是中间有很多各种重试的场景,攀升也有两种情况,我尽量总结的清晰,过程如下:
- 验证key,value 不能为空。
-
插入元素
- 通过
findPredecessor
方法找到待插入位置的前驱节点b
,后续节点n
。查询过程中遇到value为空直接删除。 - 判断b,n是否已经发生变化,是否已被打上删除标识。重试
- 比较n.key待插入的key是否相等,如果是则cas替换n.value。失败重试。成功则直接返回退出。
- 否则创建一个数据节点z,并将z插入到b和n中间。失败重试。
- 通过
-
建立索引
- 随机算法(获取一个随机数 & 0x80000001 == 0) 判断是否需要给这个数建立索引。需要建立几层索引。
- 如果需要建立索引的层不大于当前最大层数。在需要建立索引的每一层创建一个相同数据的索引节点,down关联起来。
- 如果超过了最大层数,则设置需要建立索引的层数为当前最大层数+1。在每一层创建一个相同数据的索引节点,down关联起来,因为新增了层。创建新的索引头结点HeadIndex,right直接指向新增的索引节点,down关联下层头节点。并设置跳表的头结点
head
为新的索引头节点。 -
新增的索引插入到索引表中
- 从上到下 从左到右 查找每一层的 前驱节点q 后驱节点r。
- 将每一层的索引节点插入的当前层的q和r中。
remove 方法
public V remove(Object key) {
return doRemove(key, null);
}
final V doRemove(Object key, Object value) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
// b = key的前驱节点
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
// n为空 说明已经被删除了
if (n == null)
break outer;
// 后续节点的后续节点
Node<K,V> f = n.next;
// 不相等 重试
if (n != b.next) // inconsistent read
break;
// value为空 说明已经被打上删除标识了 删除之后重试
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
// b被打上删除标识,重试
if (b.value == null || v == n) // b is deleted
break;
// key比后续节点的key小 说明已经目标已经被删除了
if ((c = cpr(cmp, key, n.key)) < 0)
break outer;
// 继续往后找
if (c > 0) {
b = n;
n = f;
continue;
}
// key相等 值不相等 退出删除方法
if (value != null && !value.equals(v))
break outer;
// 将目标数据的value 设置为null 失败重试
if (!n.casValue(v, null))
break;
// 往n和f后面插入一个marker, 然后把b的后续节点指向f
// 如果其中任何一个操作失败 则说明别人线程改变了 则从b开始往后执行删除
if (!n.appendMarker(f) || !b.casNext(n, f))
findNode(key); // retry via findNode
else {
//清楚索引 因为value已经为null了
findPredecessor(key, cmp); // clean index
// 如果头节点的后续节点为空 执行减层
if (head.right == null)
// 如果头三层的后续节点都是空 则删除一层 失败则不删
tryReduceLevel();
}
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
}
return null;
}
删除操作还是有点复杂,大概过程如下:
- 根据key查询到前驱节点b,后续节点n(即待删除节点),n的后续节点f。查找过程会删除标记了删除标识了的节点。
- 验证b,n,f是否已发送变化,是否已被删除。是则重试。
- 将待删除节点的value置为null。
- 尝试在n和f节点间插入一个Marker节点,成功之后尝试将b的后续节点设置为f。
- 如果失败了则说明已经被其他线程改变了结构,所以直接重新从待删除节点的前驱节点开始往后判断删除。
- 如果都成功了则清除索引
- 是否需要减层。头三层的头索引节点的后续节点都为null,尝试删除1层,失败放弃。
使用源码里的注解画的图
+------+ +------+ +------+
... | b |------>| n |----->| f | ...
+------+ +------+ +------+
+------+ +------+ +------+ +------+
... | b |------>| n |----->|marker|------>| f | ...
+------+ +------+ +------+ +------+
+------+ +------+
... | b |----------------------------------->| f | ...
+------+ +------+
size 方法
ConcurrentSkipListMap 的size()
方法不像先前我们分析过的集合类,有一个属性时时记录容量值,O(1)时间复杂度。而是O(n)。
public int size() {
long count = 0;
// 从第一个value不为null的数据节点往后遍历 累计有效节点数
for (Node<K,V> n = findFirst(); n != null; n = n.next) {
if (n.getValidValue() != null)
++count;
}
return (count >= Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) count;
}
// 找到第一个不value为null的数据节点
final Node<K,V> findFirst() {
for (Node<K,V> b, n;;) {
if ((n = (b = head.node).next) == null)
return null;
if (n.value != null)
return n;
n.helpDelete(b, n.next);
}
}
总体过程:通过findFirst
方法获取第一个value不为null的Node数据节点,然后往后遍历,如果是有效数据getValidValue
,则加1。
总结
ConcurrentSkipListMap还是很有意思的,不同于以前的算法数据结构,但是也很快速实现了有序数据的插入,查找,删除操作,并且保证了数据安全。跳表数据结构也应用到了很多地方,比如redis,redis的实际实现算法跟ConcurrentSkipListMap有较大不同,又挖个坑,以后分享。
后记
这次画图分享总体下来还是很爽的,通读源码,战术总结,手工绘图,不容易,不过还是挺满意,自己也学到了很多,希望大家喜欢。
量变引发质变,经常进步一点点,期待蜕变的自己。