Redis Zrange 范围查找-底层分析

介绍

Redis Zrange 是有序集合( SortedSet ) 提供的一个命令,可以返回有序集中指定区间内的成员,而有序集合比较有用的一个功能就是 "范围查找" 时间复杂度平均是 O[(LogN)+M] M是返回的元素个数,有序集合底层是通过字典+跳跃表的方式来实现的,我们这里只看这个跳跃表结构如何实现范围查找.

跳跃表

跳跃表可以理解成是可以二分查找的链表,因为跳跃表是基于链表去实现的,而每个节点都会随机一个层数,同时这个节点会根据当前随机出的level层数去创建一个level数组,而这个数组里面就是保存每一层的对应的前驱指针,所以每一个节点的level层数都不一样,这样最后的结果就是有的节点在高层有的在低层,但是根据随机算法设定高层的节点数永远小于低层,全部分散开从高到低形成跨度,每次查找某个值也是先在高level层去查找,因为高level层的节点永远是最少的且跨度最大的,我们这里具体太详细的概念就不做多说了,可以详细去了解下这个 数据结构-跳跃表.

redis-跳跃表

上面这张图大概就是描述redis里面的一个跳跃表结构,score就是分数也就是节点的value值,查找的时候就是利用每一层的跨度从高到低去比较value的大小最终确定结果,也可以理解成从高到低不断的缩小查找范围,跟二分查找有点类似,这也就是我们上面说的跳跃表就是一个可以二分的链表,但是可以看到除了score还有一个span字段,正常的跳跃表是没有这个span字段的,而作者为了可以实现范围查找(分页)扩展了一个这个字段用来记录从高到低每个节点距离下一个节点之间的跨度。

范围查找(分页) span 字段

首先因为我们要做的是范围查找,不像上面的可以按分数从高到低利用跨度去比较缩小范围然后最后定位一个节点,我们要做的是 limit 0 10 -- limit 20 10 -- limit 100 10 这样的效果,如果不加这个span字段那么就只能退化成从最低层节点开始的位置不断的向后遍历直到找到我们要的范围起始位置节点,那么最终的时间复杂度也就是O(n),然而加上这个span字段之后,也可以跟查找分数的方式一样从最高层利用span这个字段(记录距离下一个节点的距离),去不断的累加span值判断是否小于等于我们的起始位置,如果不等于则判断是否当前就是我们的起始位置,如果也不是那么就降下来一层继续累加span,跟查找分数的方式基本一样.

举个栗子:
查找 > zrange key 5,2
那么就从最高层开始计算,首先
level4 -- score:0-span:1 + score:10-span:4 == 5
level3 -- ....
level2 -- ....
这样的话直接计算出来 score:20 是第5个节点,那么就确认了,也就代表直接定位到了第5个节点指针位置,那么最后就会在最低层以这个score:20节点指针作为开始位置不断向后获取2个节点然后返回结束.

所以使用 zrange 这个命令去范围获取,平均的时间复杂度就是 O[(LogN)+M],最坏的情况O(n)

下面这段代码是从 redis 源码 src/t_zset.c 里面抽出来的,功能就是创建一个跳表,为了生成的跳跃表跟上图一致,我这里对代码稍作修改了下,使每个节点层数及分布跟上图完全一致,也就是每个score节点都固定了level层数,不让它随机分配,同时也可以看下每插入一个score节点程序是如何算出span的.

#include <stdio.h>
#include <stdlib.h>

//设置跳表层数
#define ZSKIPLIST_MAXLEVEL 4

//跳表节点结构体
typedef struct zskiplistNode {
    double score;
    //跳表节点中 存放的前驱节点
    struct zskiplistNode *backward;
    //保存每个节点中每一层指向的后继节点数组
    struct zskiplistLevel {
        //存放每一层的节点指针
        struct zskiplistNode *forward;
        //当前节点与后继节点跨过的节点数量
        unsigned int span;
    } level[];
} zskiplistNode;

//跳表结构体
typedef struct zskiplist {
    //指向跳表节点的头尾指针
    struct zskiplistNode *header, *tail;
    //跳表节点的个数
    unsigned long length;
    //当前跳表节点最大层数
    int level;
} zskiplist;

/* 创建跳表节点 */
zskiplistNode *zslCreateNode(int level, double score) {
    zskiplistNode *zn = (zskiplistNode *)malloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
    zn->score = score;
    return zn;
}

/* 初始化跳表-同时创建一个初始化头节点 */
zskiplist * zslCreate(void) {
    int j;
    zskiplist *zsl;
    zsl = malloc(sizeof(*zsl));
    //设置当前默认层高为1 
    zsl->level = 1; 
    //节点数量
    zsl->length = 0; 
    //创建头结点
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0);
    //遍历头节点的每一层,初始化值
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
       zsl->header->level[j].forward = NULL;
       zsl->header->level[j].span = 0; 
    }    
    zsl->header->backward = NULL;
    zsl->tail = NULL;
    //返回创建的跳表指针
    return zsl; 
}

/* 往跳表里面插入节点 */
zskiplistNode *zslInsert(zskiplist *zsl, double score) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;
   
    //从高到低level层不断地判断找到当前节点插入的位置
    //注意:这块我稍作修改了下,跟源码有点区别,源码除了判断分数还有其他的判断条件.
    //     而我这里只判断了一个分数
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        while (x->level[i].forward && x->level[i].forward->score < score) {
            //记录当前插入的节点在每一层跳过的节点距离
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }

    //这块是我固定写死了,为了跟上图匹配,正常源码这块
    //全部都是随机生成的level    
    if (score == 20.0 || score == 10.0){
        level = 4;
    }
    if(score == 15.0){
        level = 2;
    }
    if(score == 17.0){
        level = 1;
    }
    if(score == 18.0){
        level = 3;
    }
    //level = ZSKIPLIST_MAXLEVEL;//zslRandomLevel();
    //end...    

    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    
    //printf("%d-%d-%d-%d\n",rank[3],rank[2],rank[1],rank[0]);  
    
    //将当前的节点插入到跳跃表每一层里面
    x = zslCreateNode(level,score);
    for (i = 0; i < level; i++) {
        //更新每一层指针
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;
        //更新当前节点和指向当前节点(相当于后驱节点)的span值.
        //更新当前节点span         
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        //更新后驱节点span
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
       
        //printf("%d--%d\n",x->level[i].span,update[i]->level[i].span);
    }
    
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }
    
    //设置 后退指针
    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    //关联头尾
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    //增加跳表长度
    zsl->length++;
    return x;
}

/* 随机生成level层算法 */
int zslRandomLevel(void) {
    int level = 1;
#define ZSKIPLIST_P 0.25
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

//入口
int main(){
     //我们利用redis提供的跳表源码,模拟插入一些节点
    zskiplist* zsl = zslCreate();
    
    zslInsert(zsl,10.0);
    zslInsert(zsl,25.0);
    zslInsert(zsl,15.0);
    zslInsert(zsl,17.0);
    zslInsert(zsl,18.0);
    zslInsert(zsl,20.0);
    
    //打印出跳表结构
    int i = 0;
    zskiplistNode* p = zsl->header; 
    zskiplistNode* x = p;   
    for (i = zsl->level-1; i >= 0; i--) {
        printf("level%d\t",i+1);
        while (x->level[i].forward) {
            printf("%.1f-%d\t",x->score,x->level[i].span);
            x = x->level[i].forward;
        }
        printf("%.1f-%d\n",x->score,x->level[i].span);
        x = p;  
    }
}

运行结果-打印出跟上图一样的跳表结构

跳表结构

上面的代码如果有兴趣可以直接拿下来修改调试,然后最后编译运行就可以了.

下面看下 zrange 命令如何去跳跃表-范围查找的

//zrange 命令底层会调用此函数
void zrangeGenericCommand(redisClient *c, int reverse) {
      //.....省略
      
      // 处理ziplist编码的情况
      if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
          //.....
      } 
      // 处理skiplist编码的情况
      else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
           zset *zs = zobj->ptr;
           zskiplist *zsl = zs->zsl;
           zskiplistNode *ln;
           robj *ele;

           //根据迭代对象取出第一个开始的迭代节点
           if (reverse) {
              ln = zsl->tail;
              if (start > 0)
                  //通过传入的起始位置快速去跳跃表定位这个起始位置节点指针
                  ln = zslGetElementByRank(zsl,llen-start);
              } else {
                  ln = zsl->header->level[0].forward;
                  if (start > 0)
                  ln = zslGetElementByRank(zsl,start+1);
               }

            // 依次迭代,取出相应的元素
            while(rangelen--) {
            redisAssertWithInfo(c,zobj,ln != NULL);
            // 取出元素值
            ele = ln->obj;
            addReplyBulk(c,ele);
            if (withscores)
                addReplyDouble(c,ln->score);
            // 根据迭代方向去的前一个节点或者后一个节点
            ln = reverse ? ln->backward : ln->level[0].forward;
        }
      }

     //......省略
}

zslGetElementByRank() 范围查找函数

zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
    zskiplistNode *x;
    // 记录当前的排位信息
    unsigned long traversed = 0;
    int i;

    x = zsl->header;
    //可以看到这块代码就是我们上面介绍的根据span去查找起始位置
    //从高level到低level层利用每一层每一个节点的span跨度去累加判断
    for (i = zsl->level-1; i >= 0; i--) {
        //不断的累加span然后判断是否小于等于我们的起始位置
        while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
        {
            traversed += x->level[i].span;
            x = x->level[i].forward;
        }
        //判断是否传入的起始位置
        if (traversed == rank) {
            return x;
        }
        //如果不等于则按当前的节点位置降低level层数继续查找
    }
    return NULL;
}

结束

通过以上代码分析可以看出来redis有序集合的范围查找性能还是不错的,而且还是全部基于内存,所以感觉如果想高速分页的话,可以把ID全部丢到这个有序集合里面,然后每次zrange去范围获取就行了.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,539评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,911评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,337评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,723评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,795评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,762评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,742评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,508评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,954评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,247评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,404评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,104评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,736评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,352评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,557评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,371评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,292评论 2 352

推荐阅读更多精彩内容