剖析Java 集合框架(六)-ConcurrentSkipListMap

前言

前面我们分析了 HashMapConcurrentHashMap ,都是一种键值对形式的数据存储容器,区别是后者在多线程环境下也能保证数据安全。但是它们的内部元素都是无序的,元素所处桶的位置是通过key的hashcode经过hash与桶容量取模确定的。当然我们也知道TreeMap即是一种有序的键值对容器,它内部是基于红黑树算法实现排序的,而且非线程安全,在多线程环境下使用存在问题。今天我们来了解下基于跳表(SkipList) 实现的线程安全的 ConcurrentSkipListMap

跳表

传统意义的单链表是一个线性结构,向有序的链表中插入一个节点需要 O(n)的时间,查找操作需要 O(n)的时间。
如果我们把链表中的元素攀升几个元素,下图所示


跳表数据结构

每间隔一个元素攀升一个,相比于单链表的O(n)可以减少查找所需时间为 O(n/2)。因为我们可以先通过每个节点的最上面的指针先进行查找,这样就能跳过一半的节点。
这就是跳表的基本思路,先从最上层的链表查询,这样直接跳过了大量不必要的元素,在该层碰壁了,无法继续往后走了,再往下层走。每个节点不止有指向后一个节点的指针,还有指向下一层元素的指针,从而加快查找、删除等操作。

Skip list(跳表) 是一种可以代替平衡树的数据结构。让已排序的数据分布在多层链表中,以 0-1 随机数决定一个数据的向上攀升与否,通过“空间来换取时间”的一个算法,建立多级索引,实现以二分查找遍历一个有序链表,从而提高了效率。

跳转-查询

查找就是给定一个 key,查找这个 key 是否出现在跳跃表中。我们以上图查找 67 为例,查找路线如下图:


跳表数据结构-查找

查找的过程有点像我们的二分查找,不过这里我们是通过为链表建立多级索引,以空间换时间来实现二分查找。所以,跳表的查询操作的时间复杂度近似O(logN)。

跳表-插入

插入过程包含几个步骤:

  1. 找到需要插入的位置。
  2. 插入元素。
  3. 根据算法: 算出插入的元素是否需要攀升,攀升几层。
  4. 攀升后的元素,跟随处层的前后元素关联起来。
    图示如下。先找到对应位置,并插入


    跳表数据结构-插入

如果攀升了,则把每层的攀升元素和前后元素关联起来


跳表数据结构-插入-攀升到顶层

关联时的操作可以使用一个数组,该数组的容量即为层数,查找的时候即可往改数组里赋值,每一层往下跳时就往数组里放入该节点,如果插入的元素攀升到了该层,则该节点即为该攀升元素的前节点,该节点的后节点即为攀升元素的后节点。

跳表-删除

理解了插入,删除也就没什么很忙困难了:


跳表数据结构-删除

如果next指向的即为目标元素,则删除目标节点,并调整next直线目标节点的next节点

ConcurrentSkipListMap

ConcurrentSkipListMap 提供了一种线程安全的并发访问的排序映射表。内部是 SkipList(跳表)结构实现,在理论上能够在 O(log(n))时间内完成查找、插入、删除操作。

数据结构

ConcurrentSkipListMap 有一个 HeadIndex类型的head属性,为整个数据的访问起点,HeadIndex集成于 Index,存放的位每层的头索引节点,Index实际也是链表,类似于二叉树,存在下层节点属性down和后续索引节点right还有数据节点nodeNode即存实际数据的节点,实际就是个单链表。

ConcurrentSkipListMap UML图

public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> implements ConcurrentNavigableMap<K,V>,
               Cloneable,java.io.Serializable {
   /**  Special value used to identify base-level header*/
  //该值用于标记底层数据节点的头结点
   private static final Object BASE_HEADER = new Object();
 
    /** The topmost head index of the skiplist.*/
    // 顶层索引的头节点
    private transient volatile HeadIndex<K,V> head;
Node 数据节点

存储数据的节点,最基本的数据存储单元

static final class Node<K,V> {
     final K key;
     volatile Object value;
     volatile Node<K,V> next;

// 删除结点,在结点后面添加一个 marker 结点或者将结点和其后的 marker 结点从其前驱中断开。
     void helpDelete(Node<K,V> b, Node<K,V> f) {
            /*
             * Rechecking links and then doing only one of the
             * help-out stages per call tends to minimize CAS
             * interference among helping threads.
             */
            if (f == next && this == b.next) {
                if (f == null || f.value != f) // not already marked
                    casNext(f, new Node<K,V>(f));
                else
                    b.casNext(this, f.next);
            }
        }
}
Index 索引节点
static class Index<K,V> {
     final Node<K,V> node; // 数据节点
     final Index<K,V> down; // 下级level (数据都相同)
     volatile Index<K,V> right; //同一level 后继索引节点
    
// cas 设置当前索引节点的 right 指向为val
      final boolean casRight(Index<K,V> cmp, Index<K,V> val) {
    // 如果this的rightOffset的地址的值为cmp,则将值改为val
          return UNSAFE.compareAndSwapObject(this, rightOffset, cmp, val);
      }

  // 两个索引节点通过right连接, 然后替换当前索引的right
     final boolean link(Index<K,V> succ, Index<K,V> newSucc) {
        Node<K,V> n = node;
        newSucc.right = succ;
        return n.value != null && casRight(succ, newSucc);
    }

  // 解除当前索引节点和succ的连接,并将当前right的指向succ.right节点
    final boolean unlink(Index<K,V> succ) {
            return node.value != null && casRight(succ, succ.right);
        }
}

HeadIndex 头索引节点

封装了Index,作为每层的头结点,level属性用于标识当前层次的序号

/**
     * Nodes heading each level keep track of their level.
     */
static final class HeadIndex<K,V> extends Index<K,V> {
    final int level;
    HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
        super(node, down, right);
        this.level = level;
    }
}

下面的图能很清晰的表达出各个节点间的连接关系


ConcurrentSkipListMap数据结构图

构造方法

几个构造方法都是调用了 initialize()方法

public ConcurrentSkipListMap() {
    this.comparator = null;
    initialize();
}
public ConcurrentSkipListMap(Comparator<? super K> comparator) {
    this.comparator = comparator;
    initialize();
}

private void initialize() {
    keySet = null;
    entrySet = null;
    values = null;
    descendingMap = null;
    head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null),null, null, 1);
}

主要是初始化整个跳表的操作,head指向一个new Object()数据节点;

get 方法

首先根据 key 找到前驱结点,然后从前驱结点开始往后查找,找到与 key 相等的结点,则返回该结点,否则,返回 null。在这个过程中会删除一些已经标记为删除状态的结点

public V get(Object key) {
        return doGet(key);
    }

private V doGet(Object key) {
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
    // 从左上到右下 查询key的前驱节点 然后从前驱节点往后查找和key相等的节点
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                Object v; int c;
                if (n == null)
                    break outer;
                Node<K,V> f = n.next;
        // 发送变化了 退出 重试
                if (n != b.next)                // inconsistent read
                    break;
        // 被其他线程删除了 则先删除再重试
                if ((v = n.value) == null) {    // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                if (b.value == null || v == n)  // b is deleted
                    break;
                if ((c = cpr(cmp, key, n.key)) == 0) {
                    @SuppressWarnings("unchecked") V vv = (V)v;
                    return vv;
                }
                if (c < 0)
                    break outer;
                b = n;
                n = f;
            }
        }
        return null;
  }

// 从左上往右下 查询key的前驱节点。 过程中删除打上了删除标记的节点
private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
        if (key == null)
            throw new NullPointerException(); // don't postpone errors
        for (;;) {
        // q的头结点 r的q的后续节点
            for (Index<K,V> q = head, r = q.right, d;;) {
                if (r != null) {
                    Node<K,V> n = r.node;
                    K k = n.key;
        // value为空的 直接删除 
                    if (n.value == null) {
                        if (!q.unlink(r))
                            break;           // restart
                        r = q.right;         // reread r
                        continue;
                    }
          // 继续往后寻找
                    if (cpr(cmp, key, k) > 0) {
                        q = r;
                        r = r.right;
                        continue;
                    }
                }
        // 已经到达底层  并且后面的元素到大于改key 则返回该节点
                if ((d = q.down) == null)
                    return q.node;
        // 往下寻找
                q = d;
                r = d.right;
            }
        }
}

就是跳表从左上到右下的查找过程。

put 方法

核心方法是doPut()方法。

private V doPut(K key, V value, boolean onlyIfAbsent) {
        Node<K,V> z;             // added node
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            // b = 前驱数据节点  n=前驱节点是后续节点
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                if (n != null) {
                    Object v; int c;
                    Node<K,V> f = n.next;
                    // 如果先前获取到的n 又不等于后续节点了 重试
                    if (n != b.next)               // inconsistent read
                        break;
                    // n的value为空 则删除  重试
                    if ((v = n.value) == null) {   // n is deleted
                        n.helpDelete(b, f);
                        break;
                    }
                    // b的value 为空 则删除 重试
                    if (b.value == null || v == n) // b is deleted
                        break;
                    // 比较key和n.key的大小
                    if ((c = cpr(cmp, key, n.key)) > 0) {
                        b = n;
                        n = f;
                        continue;
                    }
                    // 如果已存在大小一样的key
                    if (c == 0) {
                        // cas 替换value
                        if (onlyIfAbsent || n.casValue(v, value)) {
                            @SuppressWarnings("unchecked") V vv = (V)v;
                            return vv;
                        }
                        // 失败 重试
                        break; // restart if lost race to replace value
                    }
                    // else c < 0; fall through
                }
                // 新建一个数据节点
                z = new Node<K,V>(key, value, n);
                // 将前驱节点额后续节点 cas设置为新节点
                if (!b.casNext(n, z))
                    break;         // restart if lost race to append to b
                break outer;
            }
        }

        // 获取个随机数
        int rnd = ThreadLocalRandom.nextSecondarySeed();
        // 需要攀升
        if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
            int level = 1, max;
            // 攀升多少层
            while (((rnd >>>= 1) & 1) != 0)
                ++level;
            // 新增的索引节点
            Index<K,V> idx = null;
            HeadIndex<K,V> h = head;
            // 如果攀升的层数不大于当前最大层数
            if (level <= (max = h.level)) {
                // 在每一层创建一个相同数据的索引节点,用down关联起来
                for (int i = 1; i <= level; ++i)
                    idx = new Index<K,V>(z, idx, null);
            }
            // 超过最大层数 则增加一层
            else { // try to grow by one level
                level = max + 1; // hold in array and later pick the one to use
                @SuppressWarnings("unchecked")Index<K,V>[] idxs =
                        (Index<K,V>[])new Index<?,?>[level+1];
                // 在每一层创建一个相同数据的索引节点,用down关联起来 并放入数组中
                for (int i = 1; i <= level; ++i)
                    idxs[i] = idx = new Index<K,V>(z, idx, null);
                for (;;) {
                    h = head;
                    int oldLevel = h.level;
                    // 最大层数已经发生变化了
                    if (level <= oldLevel) // lost race to add level
                        break;
                    // 新的头索引节点
                    HeadIndex<K,V> newh = h;
                    Node<K,V> oldbase = h.node;
                    // 新增的层数 创建每层的头索引节点 并关联起来,后续节点 即为put进来的数据节点
                    for (int j = oldLevel+1; j <= level; ++j)
                        newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
                    // cas 设置新的头索引节点
                    if (casHead(h, newh)) {
                        h = newh;
                        idx = idxs[level = oldLevel];
                        break;
                    }
                }
            }

            //给新添加的索引节点 关联前后驱 节点
            // find insertion points and splice in
            splice: for (int insertionLevel = level;;) {
                int j = h.level;
                for (Index<K,V> q = h, r = q.right, t = idx;;) {
                    if (q == null || t == null)
                        break splice;
                    // 查找当前层的 前驱节点q 后驱节点r
                    if (r != null) {
                        Node<K,V> n = r.node;
                        // compare before deletion check avoids needing recheck
                        int c = cpr(cmp, key, n.key);
                        //如果 n 正在被其他线程删除,那么调用 unlink 去删除它
                        if (n.value == null) {
                            if (!q.unlink(r))
                                break;
                            // r重新获取q的后驱节点 重新循环
                            r = q.right;
                            continue;
                        }
                        // 如果大于0  则都往后移一个 继续往后寻找
                        if (c > 0) {
                            q = r;
                            r = r.right;
                            continue;
                        }
                    }

                    if (j == insertionLevel) {
                        // 将t插入到q 和 r中 ,同层 失败 重试
                        if (!q.link(r, t))
                            break; // restart
                        // 插入的时候 t被删除了 直接退出
                        if (t.node.value == null) {
                            findNode(key);
                            break splice;
                        }
                        // 到底了 退出
                        if (--insertionLevel == 0)
                            break splice;
                    }
                    // j 和 insertionLevel 应该同步
                    if (--j >= insertionLevel && j < level)
                        t = t.down;
                    // 处理下一层
                    q = q.down;
                    r = q.right;
                }
            }
        }
        return null;
    }

图解可以参考上面的跳表的插入。
理解了过程其实还是挺简单的,只是中间有很多各种重试的场景,攀升也有两种情况,我尽量总结的清晰,过程如下:

  1. 验证key,value 不能为空。
  2. 插入元素
    1. 通过 findPredecessor方法找到待插入位置的前驱节点b后续节点n。查询过程中遇到value为空直接删除。
    2. 判断b,n是否已经发生变化,是否已被打上删除标识。重试
    3. 比较n.key待插入的key是否相等,如果是则cas替换n.value。失败重试。成功则直接返回退出。
    4. 否则创建一个数据节点z,并将z插入到b和n中间。失败重试。
  3. 建立索引
    1. 随机算法(获取一个随机数 & 0x80000001 == 0) 判断是否需要给这个数建立索引。需要建立几层索引。
    2. 如果需要建立索引的层不大于当前最大层数。在需要建立索引的每一层创建一个相同数据的索引节点,down关联起来。
    3. 如果超过了最大层数,则设置需要建立索引的层数为当前最大层数+1。在每一层创建一个相同数据的索引节点,down关联起来,因为新增了层。创建新的索引头结点HeadIndex,right直接指向新增的索引节点,down关联下层头节点。并设置跳表的头结点head为新的索引头节点。
    4. 新增的索引插入到索引表中
      1. 从上到下 从左到右 查找每一层的 前驱节点q 后驱节点r。
      2. 将每一层的索引节点插入的当前层的q和r中。

remove 方法

public V remove(Object key) {
        return doRemove(key, null);
    }

final V doRemove(Object key, Object value) {
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
        // b  = key的前驱节点
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                Object v; int c;
        // n为空 说明已经被删除了
                if (n == null)
                    break outer;
        // 后续节点的后续节点
                Node<K,V> f = n.next;
        // 不相等 重试
                if (n != b.next)                    // inconsistent read
                    break;
        // value为空 说明已经被打上删除标识了 删除之后重试
                if ((v = n.value) == null) {        // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
        // b被打上删除标识,重试
                if (b.value == null || v == n)      // b is deleted
                    break;
        // key比后续节点的key小 说明已经目标已经被删除了
                if ((c = cpr(cmp, key, n.key)) < 0)
                    break outer;
        // 继续往后找
                if (c > 0) {
                    b = n;
                    n = f;
                    continue;
                }
        // key相等 值不相等 退出删除方法
                if (value != null && !value.equals(v))
                    break outer;
        // 将目标数据的value 设置为null  失败重试
                if (!n.casValue(v, null))
                    break;
        // 往n和f后面插入一个marker,  然后把b的后续节点指向f
        // 如果其中任何一个操作失败 则说明别人线程改变了 则从b开始往后执行删除
                if (!n.appendMarker(f) || !b.casNext(n, f))
                    findNode(key);                  // retry via findNode
                else {
            //清楚索引 因为value已经为null了
                    findPredecessor(key, cmp);      // clean index
           // 如果头节点的后续节点为空 执行减层
                    if (head.right == null)
            // 如果头三层的后续节点都是空 则删除一层  失败则不删
                        tryReduceLevel();
                }
                @SuppressWarnings("unchecked") V vv = (V)v;
                return vv;
            }
        }
        return null;
    }

删除操作还是有点复杂,大概过程如下:

  1. 根据key查询到前驱节点b,后续节点n(即待删除节点),n的后续节点f。查找过程会删除标记了删除标识了的节点。
  2. 验证b,n,f是否已发送变化,是否已被删除。是则重试。
  3. 将待删除节点的value置为null。
  4. 尝试在n和f节点间插入一个Marker节点,成功之后尝试将b的后续节点设置为f。
  5. 如果失败了则说明已经被其他线程改变了结构,所以直接重新从待删除节点的前驱节点开始往后判断删除。
  6. 如果都成功了则清除索引
  7. 是否需要减层。头三层的头索引节点的后续节点都为null,尝试删除1层,失败放弃。

使用源码里的注解画的图

     +------+       +------+      +------+
...  |   b  |------>|   n  |----->|   f  | ...
     +------+       +------+      +------+
     
     +------+       +------+      +------+       +------+
...  |   b  |------>|   n  |----->|marker|------>|   f  | ...
     +------+       +------+      +------+       +------+

     +------+                                    +------+
...  |   b  |----------------------------------->|   f  | ...
     +------+                                    +------+

size 方法

ConcurrentSkipListMapsize()方法不像先前我们分析过的集合类,有一个属性时时记录容量值,O(1)时间复杂度。而是O(n)。

public int size() {
        long count = 0;
    // 从第一个value不为null的数据节点往后遍历 累计有效节点数
        for (Node<K,V> n = findFirst(); n != null; n = n.next) {
            if (n.getValidValue() != null)
                ++count;
        }
        return (count >= Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) count;
    }

// 找到第一个不value为null的数据节点
 final Node<K,V> findFirst() {
        for (Node<K,V> b, n;;) {
            if ((n = (b = head.node).next) == null)
                return null;
            if (n.value != null)
                return n;
            n.helpDelete(b, n.next);
        }
    }

总体过程:通过findFirst方法获取第一个value不为null的Node数据节点,然后往后遍历,如果是有效数据getValidValue,则加1。

总结

ConcurrentSkipListMap还是很有意思的,不同于以前的算法数据结构,但是也很快速实现了有序数据的插入,查找,删除操作,并且保证了数据安全。跳表数据结构也应用到了很多地方,比如redis,redis的实际实现算法跟ConcurrentSkipListMap有较大不同,又挖个坑,以后分享。

后记

这次画图分享总体下来还是很爽的,通读源码,战术总结,手工绘图,不容易,不过还是挺满意,自己也学到了很多,希望大家喜欢。

量变引发质变,经常进步一点点,期待蜕变的自己。

©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,717评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,501评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,311评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,417评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,500评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,538评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,557评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,310评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,759评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,065评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,233评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,909评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,548评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,172评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,420评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,103评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,098评论 2 352

推荐阅读更多精彩内容