跳表SkipList

对于有序并且对增删改操作友好的数据结构有List、Tree等,对于Tree实现起来可能比较复杂,而SkipList(跳表)也可实现有序存储并且增删改的性能也不错,只是增加了空间复杂度,是一种空间换时间的思路,在Java中ConcurrentSkipListMap就是基于跳表实现的,它可以作为并发安全的有序集合使用,并且锁粒度可控,比Collections.synchronizedMap性能要好。至于为什么不用Tree来实现并发安全的有序集合,很大程度上是因为Tree的复杂性很难控制锁粒度。

1、SkipList

下面以一个例子来解释跳表的原理:

一个LinkedList,存储了【1,3,6,8,9,11,15,33】,它的存储结构是这个样子的:


image.png

当然作为链表,查找操作一个元素的时间复杂度是O(n),效率更高的树存储,可以达到O(lgn)。但是如上面所说树的复杂性较高,实现起来需要注意很多地方,而在并发场景下更重要的是树没法最小化控制锁粒度,而跳表则更适合,在SkipList中上面例子的存储结构可能是这样的:


image.png

SkipList使用分层的结构,第一层结构和List一样,往上一层以一定间隔提取数据建立一个类似索引的结构,往上依次可以理解为一级索引与二级索引。
比如查找33就不需要从头遍历链表了,查找路径是这样的:


image.png
2、ConcurrentSkipListMap

首先参考上面的图来介绍ConcurrentSkipListMap中定义的存储结构

Node:

数据存储还是使用Node节点,包括key、value、next:

static final class Node<K,V> {
    final K key;
    volatile Object value;
    volatile Node<K,V> next;
......
}
index:

从上面的图可以知道,还需要一个结构来定义某一层的某一个节点,这个节点需要存储的信息有自己所属的level、下一个节点、右一个节点、该节点对应的Node:

static class Index<K,V> {
    final Node<K,V> node;
    final Index<K,V> down;
    volatile Index<K,V> right;

    Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {
        this.node = node;
        this.down = down;
        this.right = right;
    }

    /**
     * compareAndSet right field.
     */
    final boolean casRight(Index<K,V> cmp, Index<K,V> val) {
        return RIGHT.compareAndSet(this, cmp, val);
    }

    final boolean link(Index<K,V> succ, Index<K,V> newSucc) {
        Node<K,V> n = node;
        newSucc.right = succ;
        return n.value != null && casRight(succ, newSucc);
    }

    final boolean unlink(Index<K,V> succ) {
        return node.value != null && casRight(succ, succ.right);
    }

    // VarHandle mechanics
    private static final VarHandle RIGHT;
    static {
        try {
            MethodHandles.Lookup l = MethodHandles.lookup();
            RIGHT = l.findVarHandle(Index.class, "right", Index.class);
        } catch (ReflectiveOperationException e) {
            throw new Error(e);
        }
    }
}


  static final class HeadIndex<K,V> extends Index<K,V> {
    final int level;
    HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
        super(node, down, right);
        this.level = level;
    }
}

在ConcurrentSkipListMap中定义了Index和HeadIndex两个。

我们从最基本的查询数据方法来看看ConcurrentSkipListMap是怎么实现的:

private Node<K,V> findNode(Object key) {
    if (key == null)
        throw new NullPointerException(); // don't postpone errors
    Comparator<? super K> cmp = comparator;         //比较器,可以构造初始化时传入
    outer: for (;;) {
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {     //findPredecessor获取前驱节点
            Object v; int c;
            if (n == null)
                break outer;
            Node<K,V> f = n.next;
            if (n != b.next)                // inconsistent read          //两次读取不一致,循环重试
                break;
            if ((v = n.value) == null) {    // n is deleted          //该节点已被删除,循环重试
                n.helpDelete(b, f);
                break;
            }
            if (b.value == null || v == n)  // b is deleted      //前驱节点被删除,循环重试
                break;
            if ((c = cpr(cmp, key, n.key)) == 0)     //比较节点key值,相等则返回该节点value,不等说明这期间发生了变化,循环重试
                return n;
            if (c < 0)
                break outer;
            b = n;
            n = f;
        }
    }
    return null;
}

/**
 * Returns a base-level node with key strictly less than given key,
 * or the base-level header if there is no such node.  Also
 * unlinks indexes to deleted nodes found along the way.  Callers
 * rely on this side-effect of clearing indices to deleted nodes.
 * @param key the key
 * @return a predecessor of key
 */
private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
    if (key == null)
        throw new NullPointerException(); // don't postpone errors
    for (;;) {
        for (Index<K,V> q = head, r = q.right, d;;) {
            if (r != null) {
                Node<K,V> n = r.node;
                K k = n.key;
                if (n.value == null) {
                    if (!q.unlink(r))
                        break;           // restart
                    r = q.right;         // reread r
                    continue;
                }
                if (cpr(cmp, key, k) > 0) {
                    q = r;
                    r = r.right;
                    continue;
                }
            }
            if ((d = q.down) == null)
                return q.node;
            q = d;
            r = d.right;
        }
    }
}

简单的说跳表查找数据key-A的值的时候先通过index索引节点来定位到小于A的level =1的最大索引节点,再向右遍历查找到key = A的节点。(这也是跳表相比与树的另一个优势:跳表查找某一区间的数的复杂度也是lgn,因为只要定位到左区间,向右就可以遍历到右区间了,而树对于获取区间值却不行,Redis中数据存储使用的就是跳表+散列表。当然像红黑树这种对于跳表的优势在于出现的比较早,Java中很多已有的结构都是红黑树实现的,直接用就行了,而跳表用的比较少,如果自己实现还要写跳表的逻辑)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容