RedisDB 序列化

持久化

rdb是redis的一种持久化的方案,他每隔一段时间将redisdb里面的数据序列化到硬盘中保存。所以喃序列化就是关键。
在Redisdb里面保存的都是key value数据 . 序列化的话我们其实就是key value 对于key来说保存的是string的对象,可以直接存储,对于value来说,是一个redisobjet 这个就是有很多的类型了,可以是string可以是hash,可以是set。所以先来看下对于Redisobject的序列化。

RedisObject的序列化

我们都知道我们的RedisObject里面可以存储很多不同类型的变量。所以我们序列化有一点是必须的,那就是我们要保存类型。没有类型靠内存二进制反向出来内容,操作性基本不大现实,所以先来看下类型的定义

#define RDB_TYPE_STRING 0
#define RDB_TYPE_LIST   1
#define RDB_TYPE_SET    2
#define RDB_TYPE_ZSET   3
#define RDB_TYPE_HASH   4
#define RDB_TYPE_ZSET_2 5 /* ZSET version 2 with doubles stored in binary. */
#define RDB_TYPE_MODULE 6
#define RDB_TYPE_MODULE_2 7 /* Module value with annotations for parsing without */
#define RDB_TYPE_HASH_ZIPMAP    9
#define RDB_TYPE_LIST_ZIPLIST  10
#define RDB_TYPE_SET_INTSET    11
#define RDB_TYPE_ZSET_ZIPLIST  12
#define RDB_TYPE_HASH_ZIPLIST  13
#define RDB_TYPE_LIST_QUICKLIST 14
#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 14))
#define RDB_ENC_INT8 0        /* 8 bit signed integer */
#define RDB_ENC_INT16 1       /* 16 bit signed integer */
#define RDB_ENC_INT32 2       /* 32 bit signed integer */
#define RDB_ENC_LZF 3         /* string compressed with FASTLZ */

这个定义贼简洁明了就不多说了看名字就看出来了
再来看下对类型的保存

int rdbSaveObjectType(rio *rdb, robj *o) {
    switch (o->type) {//先获取主类型
    case OBJ_STRING://string
        return rdbSaveType(rdb,RDB_TYPE_STRING);//保存成string
    case OBJ_LIST:
        if (o->encoding == OBJ_ENCODING_QUICKLIST)//quicklist
            return rdbSaveType(rdb,RDB_TYPE_LIST_QUICKLIST);
        else
            serverPanic("Unknown list encoding");
    case OBJ_SET://set 有两种保存方式 intset 和 hashtable
        if (o->encoding == OBJ_ENCODING_INTSET)//
            return rdbSaveType(rdb,RDB_TYPE_SET_INTSET);
        else if (o->encoding == OBJ_ENCODING_HT)
            return rdbSaveType(rdb,RDB_TYPE_SET);
        else
            serverPanic("Unknown set encoding");
    case OBJ_ZSET://zet 也有两种形式 ziplist 和 skiplist
        if (o->encoding == OBJ_ENCODING_ZIPLIST)
            return rdbSaveType(rdb,RDB_TYPE_ZSET_ZIPLIST);
        else if (o->encoding == OBJ_ENCODING_SKIPLIST)
            return rdbSaveType(rdb,RDB_TYPE_ZSET_2);
        else
            serverPanic("Unknown sorted set encoding");
    case OBJ_HASH://hash table 也是两种  ziplist hashtable
        if (o->encoding == OBJ_ENCODING_ZIPLIST)
            return rdbSaveType(rdb,RDB_TYPE_HASH_ZIPLIST);
        else if (o->encoding == OBJ_ENCODING_HT)
            return rdbSaveType(rdb,RDB_TYPE_HASH);
        else
            serverPanic("Unknown hash encoding");
    case OBJ_MODULE:
        return rdbSaveType(rdb,RDB_TYPE_MODULE_2);
    default:
        serverPanic("Unknown object type");
    }
    return -1; /* avoid warning */
}

保存完类型之后我们需要的是保存的是我们的value。 我们的value有很多很多类型。对于我们的value来说不管是string hashtable list这些都有一个长度在里面,我们需要知道我们保持的内容有多少。
这个长度对于不同的个体来说代表的意义不一样。比如对于string肯定是字符才长度,对于hashtable肯定是键值对的个数。
现在来看对len的编码,其实我们对于编码还是见过很多了ziplist那些其实都是大同小异的过一下就好

#define RDB_6BITLEN 0  //六位保存
#define RDB_14BITLEN 1  //十四位保存
#define RDB_32BITLEN 0x80 //4字节保存
#define RDB_64BITLEN 0x81 //8字节保存
#define RDB_ENCVAL 3     //保存的是数字
#define RDB_LENERR UINT64_MAX
//保存长度
int rdbSaveLen(rio *rdb, uint64_t len) {
    unsigned char buf[2];
    size_t nwritten;

    if (len < (1<<6)) {
        /* Save a 6 bit len */
        buf[0] = (len&0xFF)|(RDB_6BITLEN<<6); //高两位为00
        if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
        nwritten = 1;
    } else if (len < (1<<14)) {
        /* Save a 14 bit len */
        buf[0] = ((len>>8)&0xFF)|(RDB_14BITLEN<<6);//高两位为01
        buf[1] = len&0xFF;
        if (rdbWriteRaw(rdb,buf,2) == -1) return -1;
        nwritten = 2;
    } else if (len <= UINT32_MAX) {
        /* Save a 32 bit len */
        buf[0] = RDB_32BITLEN;//高两位为10
        if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
        uint32_t len32 = htonl(len);
        if (rdbWriteRaw(rdb,&len32,4) == -1) return -1;
        nwritten = 1+4;
    } else {
        /* Save a 64 bit len */
        buf[0] = RDB_64BITLEN;//高两位为10 最后一位为1
        if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
        len = htonu64(len);
        if (rdbWriteRaw(rdb,&len,8) == -1) return -1;
        nwritten = 1+8;
    }
    return nwritten;
}
//保存数字
int rdbEncodeInteger(long long value, unsigned char *enc) { //高两位做为int的标致位
    if (value >= -(1<<7) && value <= (1<<7)-1) {//一位可以保存
        enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT8; // 高两位为11
        enc[1] = value&0xFF;//
        return 2;
    } else if (value >= -(1<<15) && value <= (1<<15)-1) {//两位可以保存
        enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT16;
        enc[1] = value&0xFF;
        enc[2] = (value>>8)&0xFF;
        return 3;
    } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {//四位保存
        enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT32;
        enc[1] = value&0xFF;
        enc[2] = (value>>8)&0xFF;
        enc[3] = (value>>16)&0xFF;
        enc[4] = (value>>24)&0xFF;
        return 5;
    } else {
        return 0;
    }
}

对于长度来说这是一个数字,在后面我们会发现在保存字符串的时候,如果我们的字符串可以转换成一个数字的话会保存成一个数字。因为这样子可以节约空间
保存是数字还是长度的区别在于第一个字符的高两位
高两位为11 这时候就是代表是数字
来看下具体的获取数字

//获取保存数字的长度和保存的是一个单纯的数字还是一个长度
//isencoded 返回代表是一个数字还是长度
//lenptr 在isencoded为0会读取具的长度当为1的时候没有读取具体的值只是读取值保存的长度
int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr) {
    unsigned char buf[2];
    int type;

    if (isencoded) *isencoded = 0;
    if (rioRead(rdb,buf,1) == 0) return -1;
    type = (buf[0]&0xC0)>>6; //获取高两位
    if (type == RDB_ENCVAL) {
        /* Read a 6 bit encoding type. */
        if (isencoded) *isencoded = 1;//代表是返回的类型
        *lenptr = buf[0]&0x3F; //获取低六位  低六位保存的是类型
    } else if (type == RDB_6BITLEN) {//高两位为0  代表后六位为长度
        /* Read a 6 bit len. */
        *lenptr = buf[0]&0x3F;//拉取长度
    } else if (type == RDB_14BITLEN) {//高两位为01 代表后十四位位长度
        /* Read a 14 bit len. */
        if (rioRead(rdb,buf+1,1) == 0) return -1;
        *lenptr = ((buf[0]&0x3F)<<8)|buf[1];
    } else if (buf[0] == RDB_32BITLEN) {// 判断是32位的类型
        /* Read a 32 bit len. */
        uint32_t len;
        if (rioRead(rdb,&len,4) == 0) return -1;//读取四位
        *lenptr = ntohl(len);//获取长度
    } else if (buf[0] == RDB_64BITLEN) {//64位类型
        /* Read a 64 bit len. */
        uint64_t len;
        if (rioRead(rdb,&len,8) == 0) return -1;//读取八位
        *lenptr = ntohu64(len);
    } else {//错误的类型
        rdbExitReportCorruptRDB(
            "Unknown length encoding %d in rdbLoadLen()",type);
        return -1; /* Never reached. */
    }
    return 0;
}

在长度的获取中我们首先进行的是高两位的校验
当发现高两位是RDB_ENCVAL他会设置标识这是一个数字,并且lenptr也仅仅是保存了数字保存的长度并没有获取具体的值
当高两位非RDB_ENCVAL会获取出具体的len的值当返回值为-1代表失败0代表成功。

Save Object

在Redis保存对象的过程中他保存类型和保存值是分开的。他是先调用saveobjecttype先把类型保存起来之后,再调用saveobject来保存值。所以我们后面来直接看值的保存。

SaveString

对于string来说,保存存在三种情况。
1.这个string是否是一个数字或者是否能够被转换成一个数字来进行保存。
2这个string是否长,是否打开了压缩标记。如果是就进行压缩保存。
3这个string进行一个普通的字符串保存。

第一种是数字进行保存

ssize_t rdbSaveLongLongAsStringObject(rio *rdb, long long value) {
    unsigned char buf[32];
    ssize_t n, nwritten = 0;
    int enclen = rdbEncodeInteger(value,buf);//对数字进行编码
    if (enclen > 0) {//编码成功
        return rdbWriteRaw(rdb,buf,enclen);//直接写入buf里面的内容
    } else {
        /* Encode as string */
        enclen = ll2string((char*)buf,32,value);//转换才string
        serverAssert(enclen < 32);
        if ((n = rdbSaveLen(rdb,enclen)) == -1) return -1;//先写入len  返回花费的长度
        nwritten += n;
        if ((n = rdbWriteRaw(rdb,buf,enclen)) == -1) return -1;//写入内容
        nwritten += n;
    }
    return nwritten;//返回写入的长度
}

第二中压缩保存


ssize_t rdbSaveLzfStringObject(rio *rdb, unsigned char *s, size_t len) {
    size_t comprlen, outlen;
    void *out;

    /* We require at least four bytes compression for this to be worth it */
    if (len <= 4) return 0;
    outlen = len-4;
    if ((out = zmalloc(outlen+1)) == NULL) return 0;//分配压缩后的空间
    comprlen = lzf_compress(s, len, out, outlen);//压缩
    if (comprlen == 0) {//压缩失败
        zfree(out);
        return 0;
    }
    ssize_t nwritten = rdbSaveLzfBlob(rdb, out, comprlen, len);//写入压缩后的文件
    zfree(out);
    return nwritten;
}
//保存压缩对象
//标记位为RDB_ENCVAL<<6)|RDB_ENC_LZF;
//然后保存压缩后长度 然后写入原来的长度
//最后写入压缩后的二进制 长度为压缩后的长度
ssize_t rdbSaveLzfBlob(rio *rdb, void *data, size_t compress_len,
                       size_t original_len) {
    unsigned char byte;
    ssize_t n, nwritten = 0;

    /* Data compressed! Let's save it on disk */
    byte = (RDB_ENCVAL<<6)|RDB_ENC_LZF; //保存格式 压缩标记高两位也是用的11 低两位使用的11  在loadlen 代码可以参考
    if ((n = rdbWriteRaw(rdb,&byte,1)) == -1) goto writeerr;//首先写入压缩标记
    nwritten += n;

    if ((n = rdbSaveLen(rdb,compress_len)) == -1) goto writeerr;//写入压缩后的长度
    nwritten += n;

    if ((n = rdbSaveLen(rdb,original_len)) == -1) goto writeerr;//写入原来的长度
    nwritten += n;

    if ((n = rdbWriteRaw(rdb,data,compress_len)) == -1) goto writeerr;//写入压缩后的二进制
    nwritten += n;

    return nwritten;

writeerr:
    return -1;
}

第三种保存普通的string对象

//没有特别的操作 写入长度写入二进制
    /* Store verbatim */
    if ((n = rdbSaveLen(rdb,len)) == -1) return -1;//保存长度
    nwritten += n;
    if (len > 0) {
        if (rdbWriteRaw(rdb,s,len) == -1) return -1;//写入二进制
        nwritten += len;
    }

最后我们来看下string整体的保存和判断保存类型流程

//首先保存入口
ssize_t rdbSaveStringObject(rio *rdb, robj *obj) {
    /* Avoid to decode the object, then encode it again, if the
     * object is already integer encoded. */
    //首先判断是不是一个数字
    if (obj->encoding == OBJ_ENCODING_INT) {//这是一个int的值
        return rdbSaveLongLongAsStringObject(rdb,(long)obj->ptr);
    } else {
        serverAssertWithInfo(NULL,obj,sdsEncodedObject(obj));
        //写入rawstring
        return rdbSaveRawString(rdb,obj->ptr,sdslen(obj->ptr));//写入
    }
}
//保存数字的上面已经有了 现在看下raw

/* Save a string object as [len][data] on disk. If the object is a string
 * representation of an integer value we try to save it in a special form */
//保存rawstring
//在raw主要进行了两次判断
//第一次判断这个字符串能否转换成一个数字进行保存
//第二次判断是判断是否能够压缩然后进行压缩保存
//都不行的话才进行普通的string保存
ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len) {
    int enclen;
    ssize_t n, nwritten = 0;

    /* Try integer encoding */
    if (len <= 11) {//首先这个还是蛮小的
        unsigned char buf[5];
        if ((enclen = rdbTryIntegerEncoding((char*)s,len,buf)) > 0) {//尝试转换成int
            if (rdbWriteRaw(rdb,buf,enclen) == -1) return -1;//如果是int  buf已经被编码了 写入就行了
            return enclen;
        }
    }

    /* Try LZF compression - under 20 bytes it's unable to compress even
     * aaaaaaaaaaaaaaaaaa so skip it */
    if (server.rdb_compression && len > 20) {//如果配置了压缩选项
        n = rdbSaveLzfStringObject(rdb,s,len);
        if (n == -1) return -1;
        if (n > 0) return n;
        /* Return value of 0 means data can't be compressed, save the old way */
    }

    /* Store verbatim */
    if ((n = rdbSaveLen(rdb,len)) == -1) return -1;//没有压缩标记或长度蛮短的 先写入长度
    nwritten += n;
    if (len > 0) {
        if (rdbWriteRaw(rdb,s,len) == -1) return -1;//写入二进制
        nwritten += len;
    }
    return nwritten;
}
其他Object的保存
ssize_t rdbSaveObject(rio *rdb, robj *o) { //save object
    ssize_t n = 0, nwritten = 0;

    if (o->type == OBJ_STRING) {//string
        /* Save a string value */
        if ((n = rdbSaveStringObject(rdb,o)) == -1) return -1;
        nwritten += n;
    } else if (o->type == OBJ_LIST) {//这个逗比是个list
        /* Save a list value */
        //对于我们的quicklist来说 他的每一个节点是一个quicklistnode 他的数据区域是使用的ziplist
        //ziplist是一个连续的内存块 所以在为压缩的node节点将将ziplist当成一个string来保存
        //如果是一个压缩的节点,使用blob的方式的保存
        
        if (o->encoding == OBJ_ENCODING_QUICKLIST) {
            quicklist *ql = o->ptr;
            quicklistNode *node = ql->head;

            if ((n = rdbSaveLen(rdb,ql->len)) == -1) return -1;//先写入长度
            nwritten += n;

            while(node) {
                if (quicklistNodeIsCompressed(node)) {
                    void *data;
                    size_t compress_len = quicklistGetLzf(node, &data);//获得压缩后的长度和压缩的data
                    //使用压缩字符串的保存方式
                    if ((n = rdbSaveLzfBlob(rdb,data,compress_len,node->sz)) == -1) return -1;//写入标记为压缩信息
                    nwritten += n;
                } else {
                    //将ziplist当成是一个string直接保存内存块
                    if ((n = rdbSaveRawString(rdb,node->zl,node->sz)) == -1) return -1;//直接写入
                    nwritten += n;
                }
                node = node->next;
            }
        } else {
            serverPanic("Unknown list encoding");
        }
    } else if (o->type == OBJ_SET) {//写入set
        //对于set 来说 如果我们使用的是hashtable 那么我们只需要遍历一遍set 把他的key全部写入就好了
        //如果是intset 这个玩意就简单了 因为他是一个连续的二进制 当成一个string写入
        /* Save a set value */
        if (o->encoding == OBJ_ENCODING_HT) {//这玩意是hash
            dict *set = o->ptr;//获取dict
            dictIterator *di = dictGetIterator(set);//获取iter
            dictEntry *de;

            if ((n = rdbSaveLen(rdb,dictSize(set))) == -1) return -1;//保存长度
            nwritten += n;

            while((de = dictNext(di)) != NULL) {//遍历
                sds ele = dictGetKey(de);//拉取key set只有key 具体可看set的保存方式
                if ((n = rdbSaveRawString(rdb,(unsigned char*)ele,sdslen(ele)))//
                    == -1) return -1;
                nwritten += n;
            }
            dictReleaseIterator(di);
        } else if (o->encoding == OBJ_ENCODING_INTSET) {
            size_t l = intsetBlobLen((intset*)o->ptr);//这个就很6了 直接把这个玩意当成一个二进制锤进去
            //当成string 进行存储
            if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
            nwritten += n;
        } else {
            serverPanic("Unknown set encoding");
        }
    } else if (o->type == OBJ_ZSET) {
        /* Save a sorted set value */
        //第一种情况是一个ziplst 连续的内存 直接把ziplist当成一个string写入
        //第二章情况
        if (o->encoding == OBJ_ENCODING_ZIPLIST) {
            
            size_t l = ziplistBlobLen((unsigned char*)o->ptr);//连续的内存 编码什么的 不存在的直接锤击去

            if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;//锤进去
            nwritten += n;
        } else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
            zset *zs = o->ptr;
            zskiplist *zsl = zs->zsl;

            if ((n = rdbSaveLen(rdb,zsl->length)) == -1) return -1;//先写入长度
            nwritten += n;

            /* We save the skiplist elements from the greatest to the smallest
             * (that's trivial since the elements are already ordered in the
             * skiplist): this improves the load process, since the next loaded
             * element will always be the smaller, so adding to the skiplist
             * will always immediately stop at the head, making the insertion
             * O(1) instead of O(log(N)). */
            //这里将只会保存key和source信息 其他信息将会被抛弃
            //其他信息可以在读取的时候 重新购照
            zskiplistNode *zn = zsl->tail;
            while (zn != NULL) {
                if ((n = rdbSaveRawString(rdb,
                      (unsigned char*)zn->ele,sdslen(zn->ele))) == -1)//首先写入key的信息
                {
                    return -1;
                }
                nwritten += n;
                if ((n = rdbSaveBinaryDoubleValue(rdb,zn->score)) == -1) //再写入一个double信息 这里没有进行编码 因为key的长度知道 在我们读取的时候 将会是一个固定的读取方式 先读取出string之后 紧接着会读取到一个double 如果不满足说明是文件有问题
                    return -1;
                nwritten += n;
                zn = zn->backward;
            }
        } else {
            serverPanic("Unknown sorted set encoding");
        }
    } else if (o->type == OBJ_HASH) {
        /* Save a hash value */
        //对于hash来说  ziplist 就直接当成string写入
        //hashtable 获取key value 挨着当成string写入
        if (o->encoding == OBJ_ENCODING_ZIPLIST) { //ziplist
            size_t l = ziplistBlobLen((unsigned char*)o->ptr);//连续内容

            if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;//直接锤进去
            nwritten += n;

        } else if (o->encoding == OBJ_ENCODING_HT) {
            dictIterator *di = dictGetIterator(o->ptr);
            dictEntry *de;

            if ((n = rdbSaveLen(rdb,dictSize((dict*)o->ptr))) == -1) return -1;//写入长度
            nwritten += n;

            while((de = dictNext(di)) != NULL) {
                sds field = dictGetKey(de);//获取key
                sds value = dictGetVal(de);//获取value
                
                if ((n = rdbSaveRawString(rdb,(unsigned char*)field,
                        sdslen(field))) == -1) return -1;//锤入key
                nwritten += n;
                if ((n = rdbSaveRawString(rdb,(unsigned char*)value,
                        sdslen(value))) == -1) return -1; //锤入value
                nwritten += n;
            }
            dictReleaseIterator(di);
        } else {
            serverPanic("Unknown hash encoding");
        }

    } else if (o->type == OBJ_MODULE) {
        /* Save a module-specific value. */
        RedisModuleIO io;
        moduleValue *mv = o->ptr;
        moduleType *mt = mv->type;
        moduleInitIOContext(io,mt,rdb);

        /* Write the "module" identifier as prefix, so that we'll be able
         * to call the right module during loading. */
        int retval = rdbSaveLen(rdb,mt->id);
        if (retval == -1) return -1;
        io.bytes += retval;

        /* Then write the module-specific representation + EOF marker. */
        mt->rdb_save(&io,mv->value);
        retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF);
        if (retval == -1) return -1;
        io.bytes += retval;

        if (io.ctx) {
            moduleFreeContext(io.ctx);
            zfree(io.ctx);
        }
        return io.error ? -1 : (ssize_t)io.bytes;
    } else {
        serverPanic("Unknown object type");
    }
    return nwritten;
}

对对象的存储可以总结下
首先存储的是对象的类型,对应的是SaveObjetType方法这个方法其实就是把类型编号存储文件。
然后存储Object的大小对于大小这个类型来说类型不同是不一样的。对于string来说将会存储的是一个string的长度,但是对于list来说就会是这个list的元素格式。具体会以类型和编码为主。
在存储过程中遵循如果是连续的内存块就使用string的存储方式。这样如果配置了压缩可以使用到压缩节约内存。而且当读取的时候也可以根据C语言的内存模型直接反向出原类型。
当不是连续内存块的时候就度取出关键类型。当做string来存储(除了zset的那个source)。

文件存储redisDb

现在我们知道了对于对象来说Redis的存储方式,对于我们的redisDb来说他就是一系列的key value,key是一个string value是个object所以存储来说就会很简单
在存储具体的值之前我们的db还需要一列额外的记录,比如当前是哪个db啊。我们的这个key是不是有超时信息。所以可以看下这部分是怎么实现的

/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
//对于这些值来将会被当做type来存储 在load的时候读取到这些类型将会被特殊处理
#define RDB_OPCODE_AUX        250 //设备信息 代表后面是两个string作为键值对保存的设备的一个信息
#define RDB_OPCODE_RESIZEDB   251 //db的大小信息 代表后面会保存两个长度信息 一个保存数据的dict的大小 一个保存过期键的信息
#define RDB_OPCODE_EXPIRETIME_MS 252 //过期信息毫秒 代表后面将会读取一个int64大小的数字
#define RDB_OPCODE_EXPIRETIME 253 //过期时间秒 后面会跟一个int32大小的时间
#define RDB_OPCODE_SELECTDB   254  //选择数据库
#define RDB_OPCODE_EOF        255 //数据库保存结束标记


/* Save an AUX field. */
//保存设备的一些信息 后面跟两个string load的时候之间读取
ssize_t rdbSaveAuxField(rio *rdb, void *key, size_t keylen, void *val, size_t vallen) {
    ssize_t ret, len = 0;
    if ((ret = rdbSaveType(rdb,RDB_OPCODE_AUX)) == -1) return -1;//标记为设备信息
    len += ret;
    if ((ret = rdbSaveRawString(rdb,key,keylen)) == -1) return -1;//写入key
    len += ret;
    if ((ret = rdbSaveRawString(rdb,val,vallen)) == -1) return -1;//写入value
    len += ret;
    return len;
}
//保存db信息 后面之间跟两个len信息
     uint32_t db_size, expires_size;
        db_size = (dictSize(db->dict) <= UINT32_MAX) ?
                                dictSize(db->dict) :
                                UINT32_MAX;
        expires_size = (dictSize(db->expires) <= UINT32_MAX) ?
                                dictSize(db->expires) :
                                UINT32_MAX;
        if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;//记录数据库大小信息
        if (rdbSaveLen(rdb,db_size) == -1) goto werr;//dbsize
        if (rdbSaveLen(rdb,expires_size) == -1) goto werr;//expires_size

//select
        /* Write the SELECT DB opcode */
        if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;//进入db的时候将会使用select
        if (rdbSaveLen(rdb,j) == -1) goto werr;//用len直接来保存db的编号

//RDB_OPCODE_EXPIRETIME_MS
 if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
         int64_t t64 = (int64_t) t;
    return rdbWriteRaw(rdb,&t64,8);

//RDB_OPCODE_EXPIRETIME 
//没看到用 不过喃
 if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME) == -1) return -1;
    int32_t t32 = (int32_t) t;
    return rdbWriteRaw(rdb,&t32,4);

//RDB_OPCODE_EOF
rdbSaveType(rdb,RDB_OPCODE_EOF) ;//直接就是写了个标记

对于我们的没有redisDb的保存项存储的时候可以提取出三个东西key,value,expire。
来看下存储着三个东西的流程

/* Save a key-value pair, with expire time, type, key, value.
 * On error -1 is returned.
 * On success if the key was actually saved 1 is returned, otherwise 0
 * is returned (the key was already expired). */
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val,
                        long long expiretime, long long now)
{
    /* Save the expire time */
    if (expiretime != -1) {//如果有过期信息
        /* If this key is already expired skip it */
        if (expiretime < now) return 0;//如果已经过期了 肯定就不需要存储了
        if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;//首先存储expire信息
        if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
    }

    /* Save type, key, value */
    if (rdbSaveObjectType(rdb,val) == -1) return -1;//value type
    if (rdbSaveStringObject(rdb,key) == -1) return -1; //key
    if (rdbSaveObject(rdb,val) == -1) return -1;//value
    return 1;
}

对于我们的流程来说首先就是判断exipre是否存在是否过期等。然后才是存储我们的key value信息。
对于过期时间就是首先加上一个RDB_OPCODE_EXPIRETIME_MS后面之间跟上一个过期时间
对于key value 将首先存储的是value的类型信息 然后将会保存string类型的key 最后会调用我们的object保存方法保存我们的value
对于存在exipre 就会是 RDB_OPCODE_EXPIRETIME_MS exipre valuetype key value 这种格式保存的。
下面下整体的保存


/* Produces a dump of the database in RDB format sending it to the specified
 * Redis I/O channel. On success C_OK is returned, otherwise C_ERR
 * is returned and part of the output, or all the output, can be
 * missing because of I/O errors.
 *
 * When the function returns C_ERR and if 'error' is not NULL, the
 * integer pointed by 'error' is set to the value of errno just after the I/O
 * error. */
int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
    dictIterator *di = NULL;
    dictEntry *de;
    char magic[10];
    int j;
    long long now = mstime();//当前的时间
    uint64_t cksum;
    size_t processed = 0;

    if (server.rdb_checksum)
        rdb->update_cksum = rioGenericUpdateChecksum; //校验和
    snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);//首先写入redis版本号信息 版本使用%04将会占用4个字节
    if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;//redis 5个字节加上版本号四个字节为九个字节
    if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr; //写入设备信息

    for (j = 0; j < server.dbnum; j++) {//循环遍历所有的db
        redisDb *db = server.db+j;
        dict *d = db->dict;
        if (dictSize(d) == 0) continue;
        di = dictGetSafeIterator(d);
        if (!di) return C_ERR;

        /* Write the SELECT DB opcode */
        if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;//进入db的时候将会使用select
        if (rdbSaveLen(rdb,j) == -1) goto werr;

        /* Write the RESIZE DB opcode. We trim the size to UINT32_MAX, which
         * is currently the largest type we are able to represent in RDB sizes.
         * However this does not limit the actual size of the DB to load since
         * these sizes are just hints to resize the hash tables. */
        uint32_t db_size, expires_size;
        db_size = (dictSize(db->dict) <= UINT32_MAX) ?
                                dictSize(db->dict) :
                                UINT32_MAX;
        expires_size = (dictSize(db->expires) <= UINT32_MAX) ?
                                dictSize(db->expires) :
                                UINT32_MAX;
        if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;//记录数据库大小信息
        if (rdbSaveLen(rdb,db_size) == -1) goto werr;//dbsize
        if (rdbSaveLen(rdb,expires_size) == -1) goto werr;//expires_size

        /* Iterate this DB writing every entry */
        while((de = dictNext(di)) != NULL) {
            sds keystr = dictGetKey(de);
            robj key, *o = dictGetVal(de);
            long long expire;

            initStaticStringObject(key,keystr);
            expire = getExpire(db,&key);
            if (rdbSaveKeyValuePair(rdb,&key,o,expire,now) == -1) goto werr;

            /* When this RDB is produced as part of an AOF rewrite, move
             * accumulated diff from parent to child while rewriting in
             * order to have a smaller final write. */
            if (flags & RDB_SAVE_AOF_PREAMBLE &&
                rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
            {
                processed = rdb->processed_bytes;
                aofReadDiffFromParent();
            }
        }
        dictReleaseIterator(di);
    }
    di = NULL; /* So that we don't release it again on error. */

    /* If we are storing the replication information on disk, persist
     * the script cache as well: on successful PSYNC after a restart, we need
     * to be able to process any EVALSHA inside the replication backlog the
     * master will send us. */
    if (rsi && dictSize(server.lua_scripts)) {//lua
        di = dictGetIterator(server.lua_scripts);
        while((de = dictNext(di)) != NULL) {
            robj *body = dictGetVal(de);
            if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
                goto werr;
        }
        dictReleaseIterator(di);
    }

    /* EOF opcode */
    if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;//保存eof

    /* CRC64 checksum. It will be zero if checksum computation is disabled, the
     * loading code skips the check in this case. */
    cksum = rdb->cksum;
    memrev64ifbe(&cksum);
    if (rioWrite(rdb,&cksum,8) == 0) goto werr;//保存校验和
    return C_OK;

werr:
    if (error) *error = errno;
    if (di) dictReleaseIterator(di);
    return C_ERR;
}

对于我们的保存的流程
1.保存魔数和version 将使用raw的方式直接写入固定长度
2.保存aux信息 aux信息有很多组都是固定的格式 保存aux标记后后面跟上两个string信息
3.开始保存一个db的信息。首先使用RDB_OPCODE_SELECTDB来保存db的id,后面是直接放置的一个len的数字。然后使用RDB_OPCODE_RESIZEDB标记来保存dbsize 和expires_size 。
3具体保存每一个key value 信息。如果存在expire信息就是会RDB_OPCODE_EXPIRETIME_MS int64(time) valuetype key value 这种格式存储。没有expire的话就是直接valuetype key value 保存信息。
4.保存eof信息这个就是直接保存一个标记
5.最后保存一个8字节的校验和

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,644评论 18 139
  • 分布式缓存技术PK:选择Redis还是Memcached? 经平台同意授权转载 作者:田京昆(腾讯后台研发工程师)...
    meng_philip123阅读 68,921评论 7 60
  • 之前网上搜索到很多的方法,但是会出问题,这里分享下! -(UIImage *)cvsamplebufferrefT...
    冬的天阅读 3,302评论 1 0
  • 秋,悄然而来 夏天,在几度花开的火热里 渐渐失去了绚烂的色彩 早起的一缕秋风 携来黄昏凉爽的雨袂 谁采西天的晚霞别...
    晨风世界阅读 323评论 0 0
  • 今晚听了小芳的分享会。 以前一直在微博上关注她,之前的几届分享会都是在厦门现场开,今年第一次尝试用音频的形式做微信...
    Lily5566阅读 173评论 0 0