死磕Redis5.0之跳跃表

为什么选择跳跃表

       目前经常使用的平衡数据结构有:B树,红黑树,AVL树,Splay Tree, Treep等。想象一下,给你一张草稿纸,一只笔,一个编辑器,你能立即实现一颗红黑树,或者AVL树出来吗? 很难吧,这需要时间,要考虑很多细节,要参考一堆算法与数据结构之类的树,还要参考网上的代码,相当麻烦。用跳表吧,跳表是一种随机化的数据结构,目前开源软件 Redis 和 LevelDB 都有用到它,它的效率和红黑树以及 AVL 树不相上下,但跳表的原理相当简单,只要你能熟练操作链表,就能轻松实现一个 SkipList。

有序表的搜索

考虑一个有序表


image.png

从该有序表中搜索元素 < 23, 43, 59 > ,需要比较的次数分别为 < 2, 4, 6 >,总共比较的次数
为 2 + 4 + 6 = 12 次。有没有优化的算法吗? 链表是有序的,但不能使用二分查找。类似二叉
搜索树,我们把一些节点提取出来,作为索引。得到如下结构:

image

这里我们把 < 14, 34, 50, 72 > 提取出来作为一级索引,这样搜索的时候就可以减少比较次数了。
我们还可以再从一级索引提取一些元素出来,作为二级索引,变成如下结构:

image

这里元素不多,体现不出优势,如果元素足够多,这种索引结构就能体现出优势来了。

跳跃表

下面的结构是就是跳表:
其中 -1 表示 INT_MIN, 链表的最小值,1 表示 INT_MAX,链表的最大值。

image

跳表具有如下性质:
(1) 由很多层结构组成
(2) 每一层都是一个有序的链表
(3) 最底层(Level 1)的链表包含所有元素
(4) 如果一个元素出现在 Level i 的链表中,则它在 Level i 之下的链表也都会出现。
(5) 每个节点包含两个指针,一个指向同一链表中的下一个元素,一个指向下面一层的元素。

跳表的搜索

image

例子:查找元素 117
(1) 比较 21, 比 21 大,往后面找
(2) 比较 37, 比 37大,比链表最大值小,从 37 的下面一层开始找
(3) 比较 71, 比 71 大,比链表最大值小,从 71 的下面一层开始找
(4) 比较 85, 比 85 大,从后面找
(5) 比较 117, 等于 117, 找到了节点。

跳表的插入

先确定该元素要占据的层数 K(采用丢硬币的方式,这完全是随机的)
然后在 Level 1 ... Level K 各个层的链表都插入元素。
例子:插入 119, K = 2

image

如果 K 大于链表的层数,则要添加新的层。
例子:插入 119, K = 4

image

跳表的高度。

n 个元素的跳表,每个元素插入的时候都要做一次实验,用来决定元素占据的层数 K,跳表的高度等于这 n 次实验中产生的最大 K,待续。。。

跳表的空间复杂度分析

根据上面的分析,每个元素的期望高度为 2, 一个大小为 n 的跳表,其节点数目的期望值是 2n。

跳表的删除

在各个层中找到包含 x 的节点,使用标准的 delete from list 方法删除该节点。
例子:删除 71

image

好啦,上面我们跳跃表就介绍完了,接下来我们看看 Redis中是如何实现跳跃表的把。我们知道 Redis 中 zset 有序集合底层就使用了跳跃表来存储数据,那么我们就来看看 zset 结构把。老规矩,我看redis 源码都是从命令入手的,那么我们就来看看 zadd 这个命令做了哪些事情把。

Redis 跳跃表实现

在讲 Redis 实现的跳跃表之前我们先讲讲 Redis 有序集合的组成成分吧!

/**
 * ZSETs use a specialized version of Skiplists
 * 跳跃表中的数据节点
 */
typedef struct zskiplistNode {
    sds ele;
    double score;
    // 后退指针
    struct zskiplistNode *backward;
    // 层
    struct zskiplistLevel {
        // 前进指针
        struct zskiplistNode *forward;
        /**
         * 跨度实际上是用来计算元素排名(rank)的,
         * 在查找某个节点的过程中,将沿途访过的所有层的跨度累积起来,
         * 得到的结果就是目标节点在跳跃表中的排位
         */
        unsigned long span;
    } level[];
} zskiplistNode;


/**
 * 跳跃表结构体
 */
typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    unsigned long length;
    int level;
} zskiplist;

/**
 * 有序集合结构体
 */
typedef struct zset {
    /*
     * Redis 会将跳跃表中所有的元素和分值组成 
     * key-value 的形式保存在字典中
     * todo:注意:该字典并不是 Redis DB 中的字典,只属于有序集合
     */
    dict *dict;
    /*
     * 底层指向的跳跃表的指针
     */
    zskiplist *zsl;
} zset;

zadd 命令(添加元素)

之所以从 zadd 入手,是因为我们一开始使用 zadd 添加有序集合的时候,该有序集合是不存在的,redis 需要先创建一个有序集合,这样我们才能从源头开始看起,那么我们就来看看 zaddCommand 方法的实现把。

/**
 * 往有序集合中添加元素
 * 
 * @param c 客户端数据 
 */
void zaddCommand(client *c) {
    zaddGenericCommand(c, ZADD_NONE);
}

由上可知,底层还是调用的 zaddGenericCommand() 这个方法,我们继续往下看

void zaddGenericCommand(client *c, int flags) {
    // 先忽略一些干扰代码
    ... 
    /* 从字典中查找该有序集合是否存在,如果不存在则创建 */
    zobj = lookupKeyWrite(c->db, key);
    if (zobj == NULL) {
        if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
        // 如果配置文件中关闭了 ziplist 结构存储,则直接创建用跳跃表存储的有序集合
        if (server.zset_max_ziplist_entries == 0 ||
            server.zset_max_ziplist_value < sdslen(c->argv[scoreidx + 1]->ptr)) {
            zobj = createZsetObject();
        } else {
            // 从这里我们可以看出底层有序集合一开始是使用 ziplist 存储元素的,ziplist 下次再说
            zobj = createZsetZiplistObject();
        }
        // 往字典里面保存我们新创建的 有序集合
        dbAdd(c->db, key, zobj);
    } else {
        // 不是有序集合的结构报错
        ...
    }

    for (j = 0; j < elements; j++) {
        double newscore;
        score = scores[j];
        int retflags = flags;

        ele = c->argv[scoreidx + 1 + j * 2]->ptr;
        // 在这里我们终于看到了往 zset 里面添加元素的代码了
        int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
        ...
    }
    ...
}

调用链很长,还是要大家耐着性子继续往下看!

int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
     // 忽略干扰代码
     ...
    /* 编码格式是 ziplist */
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        // 往 ziplist压缩列表中添加元素
        ...
    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { // 往跳跃表中添加元素
        zset *zs = zobj->ptr;
        zskiplistNode *znode;
        dictEntry *de;
        // todo: 在字典中查找指定元素的,注意这个字典是 zset 结构体中的字典结构
        de = dictFind(zs->dict, ele);
        if (de != NULL) {
            // 如果开启了 NX 模式,且元素已经在有序集合中存在,则直接返回
            /* NX? Return, same element already exists. */
            if (nx) {
                *flags |= ZADD_NOP;
                return 1;
            }
            // 获取字典中的有序集合中元素的分值
            curscore = *(double *) dictGetVal(de);

            /* 如果需要,准备增量的分数。 */
            if (incr) {
                // 增量分值
                score += curscore;
                if (isnan(score)) {
                    *flags |= ZADD_NAN;
                    return 0;
                }
                if (newscore) *newscore = score;
            }

            /* 分数变化时删除并重新插入。 */
            if (score != curscore) {
                ...
                // 将元素和分值重新插入到跳跃表中
                znode = zslInsert(zs->zsl, score, node->ele);
                // 释放 node 节点,因为我们可以重用 zslInsert 返回的 znode 节点,节省内存空间
                node->ele = NULL;
                zslFreeNode(node);
               // 删除干扰代码
                ...
            }
            return 1;
        } else if (!xx) {// 非 仅在元素已存在时才执行操作
            // 将有序集合中的元素复制一遍
            ele = sdsdup(ele);
            // 直接插入元素
            znode = zslInsert(zs->zsl, score, ele);
            // 将有序集合中的元素和 score 组成 key-value 存储在 zset 结构体中的字典当中
            serverAssert(dictAdd(zs->dict, ele, &znode->score) == DICT_OK);
            ...
        } else {
            ...
        }
    } else {
        serverPanic("Unknown sorted set encoding");
    }
    return 0; /* Never reached. */
}

大家耐着性子看到这里有没有点小激动,因为我们马上就要看到跳跃表是如何添加元素了,废话少说,撸代码。

/*
 * 往跳跃表中添加一个元素
 */
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    // C 语言数组长度一旦确定就不允许修改
    // 这里和层级 level 中指向的元素相同
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    // 等级指针
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    x = zsl->header;
    for (i = zsl->level - 1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position 将字符插入到指定位置*/
        rank[i] = i == (zsl->level - 1) ? 0 : rank[i + 1];
        // todo: 先根据分值比较,如果分值都相同的情况下,再比较字符串的长度
        // 我们知道有序集合里面的元素都是有序的,那么肯定就有个排序规则
        while (x->level[i].forward &&
               (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                 // strcmp() 以二进制的方式进行比较,不会考虑多字节或宽字节字符
                 sdscmp(x->level[i].forward->ele, ele) < 0))) {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /*
     * 我们假设元素不在内部,因为我们允许重复分数,重新插入相同的元素应该永远不会发生
     * 如果元素在内部,则zslInsert()的调用者应该在哈希表中进行测试已经在里面或没有。大家可以回到前面看看上面一个方法的逻辑判断
     */

    // 使用幂次定律计算节点的层级
    level = zslRandomLevel();
    // 如果计算出的层级比当前跳跃表的层级更高
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            // 这里将超出 zsl 原来层级的指针都指向新节点 o
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    // 创建一个跳跃表节点
    x = zslCreateNode(level, score, ele);
    for (i = 0; i < level; i++) {
        // 接链
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;

        /* update span covered by update[i] as x is inserted here  更新范围由update [i]覆盖,因为x插入此处 */
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    /* increment span for untouched levels 增加跨度*/
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    /* 更新后退指针 */
    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}

代码看到这里,大家再结合上面跳跃表介绍图形结合,理解起来应该更快,如果大家还有什么疑问,可以给我留言,我会在第一时间给大家回复的。

zrem 命令(删除元素)

看完添加元素的方法,我们再来看看 redis 是如何删除元素的。

void zremCommand(client *c) {
    // 有序集合不存在则直接返回
    ...
    // 我们知道 zrem 可以一次性删除多个元素,这里我们看到 redis 是循环删除元素的
    for (j = 2; j < c->argc; j++) {
        // 删除指定元素
        if (zsetDel(zobj, c->argv[j]->ptr)) deleted++;
        // 如果集合中没有元素了,则直接将该有序集合从字典中移除
        if (zsetLength(zobj) == 0) {
            dbDelete(c->db, key);
            keyremoved = 1;
            break;
        }
    }

    // 删除干扰代码
    ...
}

和刚刚添加方法一样,又调用了其他的删除方法,继续往下看。

int zsetDel(robj *zobj, sds ele) {
    // 如果编码格式是 ziplist 则从 ziplist 中删除元素
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        ...
    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {// 编码格式是跳跃表
        zset *zs = zobj->ptr;
        dictEntry *de;
        double score;
        // 在字典中解除该元素节点,但是并没有释放该节点的内存,会在下面的 skiplist 释放
        de = dictUnlink(zs->dict, ele);
        if (de != NULL) {
            /* Get the score in order to delete from the skiplist later. */
            score = *(double *) dictGetVal(de);

            /* 我们知道有序集合中会将 元素和分值构造成 entry 存储在 字典中,所以要释放 entry */
            dictFreeUnlinkedEntry(zs->dict, de);

            /* 在跳跃表中释放元素 */
            int retval = zslDelete(zs->zsl, score, ele, NULL);
            serverAssert(retval);

            if (htNeedsResize(zs->dict)) dictResize(zs->dict);
            return 1;
        }
    } else {
        serverPanic("Unknown sorted set encoding");
    }
    return 0; /* No such element found. */
}

代码进行到这里,我们应该也能猜到 zslDelete 方法里面到底做了哪些操作吧!来,我们来猜一猜。

  1. 首先要在跳跃表中定位到要删除的元素吧
  2. 我们知道该节点每一层都有前驱、后继指针,那么我们删除这个节点的时候,自然也要改变该节点的每一层节点指针的指向啦。
  3. 我们知道 redis 跳跃表中还有跨度的概念,该节点没了,那么肯定要改变相关节点的跨度
  4. 我们还知道跳跃表是有序的,有个 rank 排名的概念,删除了一个节点,后面的节点排名肯定也要做相应的改变咯。
  5. 节点删除了,最后肯定就是释放该节点空间呗。
    上面我们分析完了,实际是不是和我们预估的一样呢,我们看看代码吧
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    x = zsl->header;
    // 找到要更新的节点
    for (i = zsl->level - 1; i >= 0; i--) {
        while (x->level[i].forward &&
               (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                 sdscmp(x->level[i].forward->ele, ele) < 0))) {
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* We may have multiple elements with the same score, what we need
     * is to find the element with both the right score and object. */
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
        // 删除 skiplistNode 节点
        zslDeleteNode(zsl, x, update);
        if (!node)
            // 释放节点空间
            zslFreeNode(x);
        else
            *node = x;
        return 1;
    }
    return 0; /* not found */
}

我们看看 redis 是如何删除跳跃表中的元素的

/*
 * Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank
 * 该方法会修改跳跃表中的跨度
 *
 * zslDelete,zslDeleteByScore和zslDeleteByRank 使用的内部函数
 */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
    int i;
    for (i = 0; i < zsl->level; i++) {
        if (update[i]->level[i].forward == x) {
            update[i]->level[i].span += x->level[i].span - 1;
            update[i]->level[i].forward = x->level[i].forward;
        } else {
            // 计算跨度
            update[i]->level[i].span -= 1;
        }
    }
    if (x->level[0].forward) {
        x->level[0].forward->backward = x->backward;
    } else {
        zsl->tail = x->backward;
    }
    while (zsl->level > 1 && zsl->header->level[zsl->level - 1].forward == NULL)
        zsl->level--;
    zsl->length--;
}

zscore 命令(获取元素分值)

我们可以想想获取某个值的分值,在跳跃表中我们是不是要先在跳跃表中找到指定节点然后再获取该节点的分值吗?带着这个疑问我们看看源码吧!

/**
 * zscore 获取分值
 * @param c 
 */
void zscoreCommand(client *c) {
    robj *key = c->argv[1];
    robj *zobj;
    double score;
    // 如果在字典中找不到该有序集合或者元素类型不是 OBJ_ZSET,直接返回
    if ((zobj = lookupKeyReadOrReply(c, key, shared.nullbulk)) == NULL ||
        checkType(c, zobj, OBJ_ZSET))
        return;
    // 获取分值
    if (zsetScore(zobj, c->argv[2]->ptr, &score) == C_ERR) {
        addReply(c, shared.nullbulk);
    } else {
        addReplyDouble(c, score);
    }
}

从上面我们可以看到,分值的获取实际是通过 zsetScore 方法获取的。

int zsetScore(robj *zobj, sds member, double *score) {
   if (!zobj || !member) return C_ERR;
   // 如果编码格式压缩链表
   if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
       if (zzlFind(zobj->ptr, member, score) == NULL) return C_ERR;
   } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {// 编码格式是跳跃表
       zset *zs = zobj->ptr;
       // 直接从 zset 中的字典获取指定的 dictEntry
       dictEntry *de = dictFind(zs->dict, member);
       if (de == NULL) return C_ERR;
       // 从 dictEntry 中获取 score
       *score = *(double *) dictGetVal(de);
   } else {
       serverPanic("Unknown sorted set encoding");
   }
   return C_OK;
}

zrank 命令(获取某个元素的排名)

当我们想获取有序集合中某个元素的排名时,zrank 命令是我们很好的选择,zrank 命令返回有序集 key 中成员 member 的排名。其中有序集成员按 score 值递增(从小到大)顺序排列。排名以 0 为底,也就是说, score 值最小的成员排名为 0 。下面我们看看他里面具体实现吧!

long zsetRank(robj *zobj, sds ele, int reverse) {
    unsigned long llen;
    unsigned long rank;

    llen = zsetLength(zobj);

    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        // 压缩链表 rank 计算方式
        ...
    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        zset *zs = zobj->ptr;
        zskiplist *zsl = zs->zsl;
        dictEntry *de;
        double score;
        // 从 zset 中的字典中直接拿到指定元素 dictEntry
        de = dictFind(zs->dict, ele);
        if (de != NULL) {
            score = *(double *) dictGetVal(de);
            rank = zslGetRank(zsl, score, ele);
            /* Existing elements always have a rank. */
            serverAssert(rank != 0);
            if (reverse)
                return llen - rank;
            else
                return rank - 1;
        } else {
            return -1;
        }
    } else {
        serverPanic("Unknown sorted set encoding");
    }
}

接下来就是真正 rank 计算的方法咯,不要分心,专心看。

unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *x;
    unsigned long rank = 0;
    int i;

    x = zsl->header;
    // 这里从最上层开始查找节点元素的排名
    for (i = zsl->level - 1; i >= 0; i--) {
        while (x->level[i].forward &&
               (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                 sdscmp(x->level[i].forward->ele, ele) <= 0))) {
            rank += x->level[i].span;
            x = x->level[i].forward;
        }

        /* x might be equal to zsl->header, so test if obj is non-NULL */
        if (x->ele && sdscmp(x->ele, ele) == 0) {
            return rank;
        }
    }
    return 0;
}

好啦,到这里 Redis 中实现的跳跃表大概讲完了,当然还有其他 api 没有讲啦,感兴趣的同学可以自己琢磨琢磨,欢迎给我留言。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容