持久化
rdb是redis的一种持久化的方案,他每隔一段时间将redisdb里面的数据序列化到硬盘中保存。所以喃序列化就是关键。
在Redisdb里面保存的都是key value数据 . 序列化的话我们其实就是key value 对于key来说保存的是string的对象,可以直接存储,对于value来说,是一个redisobjet 这个就是有很多的类型了,可以是string可以是hash,可以是set。所以先来看下对于Redisobject的序列化。
RedisObject的序列化
我们都知道我们的RedisObject里面可以存储很多不同类型的变量。所以我们序列化有一点是必须的,那就是我们要保存类型。没有类型靠内存二进制反向出来内容,操作性基本不大现实,所以先来看下类型的定义
#define RDB_TYPE_STRING 0
#define RDB_TYPE_LIST 1
#define RDB_TYPE_SET 2
#define RDB_TYPE_ZSET 3
#define RDB_TYPE_HASH 4
#define RDB_TYPE_ZSET_2 5 /* ZSET version 2 with doubles stored in binary. */
#define RDB_TYPE_MODULE 6
#define RDB_TYPE_MODULE_2 7 /* Module value with annotations for parsing without */
#define RDB_TYPE_HASH_ZIPMAP 9
#define RDB_TYPE_LIST_ZIPLIST 10
#define RDB_TYPE_SET_INTSET 11
#define RDB_TYPE_ZSET_ZIPLIST 12
#define RDB_TYPE_HASH_ZIPLIST 13
#define RDB_TYPE_LIST_QUICKLIST 14
#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 14))
#define RDB_ENC_INT8 0 /* 8 bit signed integer */
#define RDB_ENC_INT16 1 /* 16 bit signed integer */
#define RDB_ENC_INT32 2 /* 32 bit signed integer */
#define RDB_ENC_LZF 3 /* string compressed with FASTLZ */
这个定义贼简洁明了就不多说了看名字就看出来了
再来看下对类型的保存
int rdbSaveObjectType(rio *rdb, robj *o) {
switch (o->type) {//先获取主类型
case OBJ_STRING://string
return rdbSaveType(rdb,RDB_TYPE_STRING);//保存成string
case OBJ_LIST:
if (o->encoding == OBJ_ENCODING_QUICKLIST)//quicklist
return rdbSaveType(rdb,RDB_TYPE_LIST_QUICKLIST);
else
serverPanic("Unknown list encoding");
case OBJ_SET://set 有两种保存方式 intset 和 hashtable
if (o->encoding == OBJ_ENCODING_INTSET)//
return rdbSaveType(rdb,RDB_TYPE_SET_INTSET);
else if (o->encoding == OBJ_ENCODING_HT)
return rdbSaveType(rdb,RDB_TYPE_SET);
else
serverPanic("Unknown set encoding");
case OBJ_ZSET://zet 也有两种形式 ziplist 和 skiplist
if (o->encoding == OBJ_ENCODING_ZIPLIST)
return rdbSaveType(rdb,RDB_TYPE_ZSET_ZIPLIST);
else if (o->encoding == OBJ_ENCODING_SKIPLIST)
return rdbSaveType(rdb,RDB_TYPE_ZSET_2);
else
serverPanic("Unknown sorted set encoding");
case OBJ_HASH://hash table 也是两种 ziplist hashtable
if (o->encoding == OBJ_ENCODING_ZIPLIST)
return rdbSaveType(rdb,RDB_TYPE_HASH_ZIPLIST);
else if (o->encoding == OBJ_ENCODING_HT)
return rdbSaveType(rdb,RDB_TYPE_HASH);
else
serverPanic("Unknown hash encoding");
case OBJ_MODULE:
return rdbSaveType(rdb,RDB_TYPE_MODULE_2);
default:
serverPanic("Unknown object type");
}
return -1; /* avoid warning */
}
保存完类型之后我们需要的是保存的是我们的value。 我们的value有很多很多类型。对于我们的value来说不管是string hashtable list这些都有一个长度在里面,我们需要知道我们保持的内容有多少。
这个长度对于不同的个体来说代表的意义不一样。比如对于string肯定是字符才长度,对于hashtable肯定是键值对的个数。
现在来看对len的编码,其实我们对于编码还是见过很多了ziplist那些其实都是大同小异的过一下就好
#define RDB_6BITLEN 0 //六位保存
#define RDB_14BITLEN 1 //十四位保存
#define RDB_32BITLEN 0x80 //4字节保存
#define RDB_64BITLEN 0x81 //8字节保存
#define RDB_ENCVAL 3 //保存的是数字
#define RDB_LENERR UINT64_MAX
//保存长度
int rdbSaveLen(rio *rdb, uint64_t len) {
unsigned char buf[2];
size_t nwritten;
if (len < (1<<6)) {
/* Save a 6 bit len */
buf[0] = (len&0xFF)|(RDB_6BITLEN<<6); //高两位为00
if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
nwritten = 1;
} else if (len < (1<<14)) {
/* Save a 14 bit len */
buf[0] = ((len>>8)&0xFF)|(RDB_14BITLEN<<6);//高两位为01
buf[1] = len&0xFF;
if (rdbWriteRaw(rdb,buf,2) == -1) return -1;
nwritten = 2;
} else if (len <= UINT32_MAX) {
/* Save a 32 bit len */
buf[0] = RDB_32BITLEN;//高两位为10
if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
uint32_t len32 = htonl(len);
if (rdbWriteRaw(rdb,&len32,4) == -1) return -1;
nwritten = 1+4;
} else {
/* Save a 64 bit len */
buf[0] = RDB_64BITLEN;//高两位为10 最后一位为1
if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
len = htonu64(len);
if (rdbWriteRaw(rdb,&len,8) == -1) return -1;
nwritten = 1+8;
}
return nwritten;
}
//保存数字
int rdbEncodeInteger(long long value, unsigned char *enc) { //高两位做为int的标致位
if (value >= -(1<<7) && value <= (1<<7)-1) {//一位可以保存
enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT8; // 高两位为11
enc[1] = value&0xFF;//
return 2;
} else if (value >= -(1<<15) && value <= (1<<15)-1) {//两位可以保存
enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT16;
enc[1] = value&0xFF;
enc[2] = (value>>8)&0xFF;
return 3;
} else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {//四位保存
enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT32;
enc[1] = value&0xFF;
enc[2] = (value>>8)&0xFF;
enc[3] = (value>>16)&0xFF;
enc[4] = (value>>24)&0xFF;
return 5;
} else {
return 0;
}
}
对于长度来说这是一个数字,在后面我们会发现在保存字符串的时候,如果我们的字符串可以转换成一个数字的话会保存成一个数字。因为这样子可以节约空间
保存是数字还是长度的区别在于第一个字符的高两位
高两位为11 这时候就是代表是数字
来看下具体的获取数字
//获取保存数字的长度和保存的是一个单纯的数字还是一个长度
//isencoded 返回代表是一个数字还是长度
//lenptr 在isencoded为0会读取具的长度当为1的时候没有读取具体的值只是读取值保存的长度
int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr) {
unsigned char buf[2];
int type;
if (isencoded) *isencoded = 0;
if (rioRead(rdb,buf,1) == 0) return -1;
type = (buf[0]&0xC0)>>6; //获取高两位
if (type == RDB_ENCVAL) {
/* Read a 6 bit encoding type. */
if (isencoded) *isencoded = 1;//代表是返回的类型
*lenptr = buf[0]&0x3F; //获取低六位 低六位保存的是类型
} else if (type == RDB_6BITLEN) {//高两位为0 代表后六位为长度
/* Read a 6 bit len. */
*lenptr = buf[0]&0x3F;//拉取长度
} else if (type == RDB_14BITLEN) {//高两位为01 代表后十四位位长度
/* Read a 14 bit len. */
if (rioRead(rdb,buf+1,1) == 0) return -1;
*lenptr = ((buf[0]&0x3F)<<8)|buf[1];
} else if (buf[0] == RDB_32BITLEN) {// 判断是32位的类型
/* Read a 32 bit len. */
uint32_t len;
if (rioRead(rdb,&len,4) == 0) return -1;//读取四位
*lenptr = ntohl(len);//获取长度
} else if (buf[0] == RDB_64BITLEN) {//64位类型
/* Read a 64 bit len. */
uint64_t len;
if (rioRead(rdb,&len,8) == 0) return -1;//读取八位
*lenptr = ntohu64(len);
} else {//错误的类型
rdbExitReportCorruptRDB(
"Unknown length encoding %d in rdbLoadLen()",type);
return -1; /* Never reached. */
}
return 0;
}
在长度的获取中我们首先进行的是高两位的校验
当发现高两位是RDB_ENCVAL他会设置标识这是一个数字,并且lenptr也仅仅是保存了数字保存的长度并没有获取具体的值
当高两位非RDB_ENCVAL会获取出具体的len的值当返回值为-1代表失败0代表成功。
Save Object
在Redis保存对象的过程中他保存类型和保存值是分开的。他是先调用saveobjecttype先把类型保存起来之后,再调用saveobject来保存值。所以我们后面来直接看值的保存。
SaveString
对于string来说,保存存在三种情况。
1.这个string是否是一个数字或者是否能够被转换成一个数字来进行保存。
2这个string是否长,是否打开了压缩标记。如果是就进行压缩保存。
3这个string进行一个普通的字符串保存。
第一种是数字进行保存
ssize_t rdbSaveLongLongAsStringObject(rio *rdb, long long value) {
unsigned char buf[32];
ssize_t n, nwritten = 0;
int enclen = rdbEncodeInteger(value,buf);//对数字进行编码
if (enclen > 0) {//编码成功
return rdbWriteRaw(rdb,buf,enclen);//直接写入buf里面的内容
} else {
/* Encode as string */
enclen = ll2string((char*)buf,32,value);//转换才string
serverAssert(enclen < 32);
if ((n = rdbSaveLen(rdb,enclen)) == -1) return -1;//先写入len 返回花费的长度
nwritten += n;
if ((n = rdbWriteRaw(rdb,buf,enclen)) == -1) return -1;//写入内容
nwritten += n;
}
return nwritten;//返回写入的长度
}
第二中压缩保存
ssize_t rdbSaveLzfStringObject(rio *rdb, unsigned char *s, size_t len) {
size_t comprlen, outlen;
void *out;
/* We require at least four bytes compression for this to be worth it */
if (len <= 4) return 0;
outlen = len-4;
if ((out = zmalloc(outlen+1)) == NULL) return 0;//分配压缩后的空间
comprlen = lzf_compress(s, len, out, outlen);//压缩
if (comprlen == 0) {//压缩失败
zfree(out);
return 0;
}
ssize_t nwritten = rdbSaveLzfBlob(rdb, out, comprlen, len);//写入压缩后的文件
zfree(out);
return nwritten;
}
//保存压缩对象
//标记位为RDB_ENCVAL<<6)|RDB_ENC_LZF;
//然后保存压缩后长度 然后写入原来的长度
//最后写入压缩后的二进制 长度为压缩后的长度
ssize_t rdbSaveLzfBlob(rio *rdb, void *data, size_t compress_len,
size_t original_len) {
unsigned char byte;
ssize_t n, nwritten = 0;
/* Data compressed! Let's save it on disk */
byte = (RDB_ENCVAL<<6)|RDB_ENC_LZF; //保存格式 压缩标记高两位也是用的11 低两位使用的11 在loadlen 代码可以参考
if ((n = rdbWriteRaw(rdb,&byte,1)) == -1) goto writeerr;//首先写入压缩标记
nwritten += n;
if ((n = rdbSaveLen(rdb,compress_len)) == -1) goto writeerr;//写入压缩后的长度
nwritten += n;
if ((n = rdbSaveLen(rdb,original_len)) == -1) goto writeerr;//写入原来的长度
nwritten += n;
if ((n = rdbWriteRaw(rdb,data,compress_len)) == -1) goto writeerr;//写入压缩后的二进制
nwritten += n;
return nwritten;
writeerr:
return -1;
}
第三种保存普通的string对象
//没有特别的操作 写入长度写入二进制
/* Store verbatim */
if ((n = rdbSaveLen(rdb,len)) == -1) return -1;//保存长度
nwritten += n;
if (len > 0) {
if (rdbWriteRaw(rdb,s,len) == -1) return -1;//写入二进制
nwritten += len;
}
最后我们来看下string整体的保存和判断保存类型流程
//首先保存入口
ssize_t rdbSaveStringObject(rio *rdb, robj *obj) {
/* Avoid to decode the object, then encode it again, if the
* object is already integer encoded. */
//首先判断是不是一个数字
if (obj->encoding == OBJ_ENCODING_INT) {//这是一个int的值
return rdbSaveLongLongAsStringObject(rdb,(long)obj->ptr);
} else {
serverAssertWithInfo(NULL,obj,sdsEncodedObject(obj));
//写入rawstring
return rdbSaveRawString(rdb,obj->ptr,sdslen(obj->ptr));//写入
}
}
//保存数字的上面已经有了 现在看下raw
/* Save a string object as [len][data] on disk. If the object is a string
* representation of an integer value we try to save it in a special form */
//保存rawstring
//在raw主要进行了两次判断
//第一次判断这个字符串能否转换成一个数字进行保存
//第二次判断是判断是否能够压缩然后进行压缩保存
//都不行的话才进行普通的string保存
ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len) {
int enclen;
ssize_t n, nwritten = 0;
/* Try integer encoding */
if (len <= 11) {//首先这个还是蛮小的
unsigned char buf[5];
if ((enclen = rdbTryIntegerEncoding((char*)s,len,buf)) > 0) {//尝试转换成int
if (rdbWriteRaw(rdb,buf,enclen) == -1) return -1;//如果是int buf已经被编码了 写入就行了
return enclen;
}
}
/* Try LZF compression - under 20 bytes it's unable to compress even
* aaaaaaaaaaaaaaaaaa so skip it */
if (server.rdb_compression && len > 20) {//如果配置了压缩选项
n = rdbSaveLzfStringObject(rdb,s,len);
if (n == -1) return -1;
if (n > 0) return n;
/* Return value of 0 means data can't be compressed, save the old way */
}
/* Store verbatim */
if ((n = rdbSaveLen(rdb,len)) == -1) return -1;//没有压缩标记或长度蛮短的 先写入长度
nwritten += n;
if (len > 0) {
if (rdbWriteRaw(rdb,s,len) == -1) return -1;//写入二进制
nwritten += len;
}
return nwritten;
}
其他Object的保存
ssize_t rdbSaveObject(rio *rdb, robj *o) { //save object
ssize_t n = 0, nwritten = 0;
if (o->type == OBJ_STRING) {//string
/* Save a string value */
if ((n = rdbSaveStringObject(rdb,o)) == -1) return -1;
nwritten += n;
} else if (o->type == OBJ_LIST) {//这个逗比是个list
/* Save a list value */
//对于我们的quicklist来说 他的每一个节点是一个quicklistnode 他的数据区域是使用的ziplist
//ziplist是一个连续的内存块 所以在为压缩的node节点将将ziplist当成一个string来保存
//如果是一个压缩的节点,使用blob的方式的保存
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
quicklist *ql = o->ptr;
quicklistNode *node = ql->head;
if ((n = rdbSaveLen(rdb,ql->len)) == -1) return -1;//先写入长度
nwritten += n;
while(node) {
if (quicklistNodeIsCompressed(node)) {
void *data;
size_t compress_len = quicklistGetLzf(node, &data);//获得压缩后的长度和压缩的data
//使用压缩字符串的保存方式
if ((n = rdbSaveLzfBlob(rdb,data,compress_len,node->sz)) == -1) return -1;//写入标记为压缩信息
nwritten += n;
} else {
//将ziplist当成是一个string直接保存内存块
if ((n = rdbSaveRawString(rdb,node->zl,node->sz)) == -1) return -1;//直接写入
nwritten += n;
}
node = node->next;
}
} else {
serverPanic("Unknown list encoding");
}
} else if (o->type == OBJ_SET) {//写入set
//对于set 来说 如果我们使用的是hashtable 那么我们只需要遍历一遍set 把他的key全部写入就好了
//如果是intset 这个玩意就简单了 因为他是一个连续的二进制 当成一个string写入
/* Save a set value */
if (o->encoding == OBJ_ENCODING_HT) {//这玩意是hash
dict *set = o->ptr;//获取dict
dictIterator *di = dictGetIterator(set);//获取iter
dictEntry *de;
if ((n = rdbSaveLen(rdb,dictSize(set))) == -1) return -1;//保存长度
nwritten += n;
while((de = dictNext(di)) != NULL) {//遍历
sds ele = dictGetKey(de);//拉取key set只有key 具体可看set的保存方式
if ((n = rdbSaveRawString(rdb,(unsigned char*)ele,sdslen(ele)))//
== -1) return -1;
nwritten += n;
}
dictReleaseIterator(di);
} else if (o->encoding == OBJ_ENCODING_INTSET) {
size_t l = intsetBlobLen((intset*)o->ptr);//这个就很6了 直接把这个玩意当成一个二进制锤进去
//当成string 进行存储
if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
nwritten += n;
} else {
serverPanic("Unknown set encoding");
}
} else if (o->type == OBJ_ZSET) {
/* Save a sorted set value */
//第一种情况是一个ziplst 连续的内存 直接把ziplist当成一个string写入
//第二章情况
if (o->encoding == OBJ_ENCODING_ZIPLIST) {
size_t l = ziplistBlobLen((unsigned char*)o->ptr);//连续的内存 编码什么的 不存在的直接锤击去
if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;//锤进去
nwritten += n;
} else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
zskiplist *zsl = zs->zsl;
if ((n = rdbSaveLen(rdb,zsl->length)) == -1) return -1;//先写入长度
nwritten += n;
/* We save the skiplist elements from the greatest to the smallest
* (that's trivial since the elements are already ordered in the
* skiplist): this improves the load process, since the next loaded
* element will always be the smaller, so adding to the skiplist
* will always immediately stop at the head, making the insertion
* O(1) instead of O(log(N)). */
//这里将只会保存key和source信息 其他信息将会被抛弃
//其他信息可以在读取的时候 重新购照
zskiplistNode *zn = zsl->tail;
while (zn != NULL) {
if ((n = rdbSaveRawString(rdb,
(unsigned char*)zn->ele,sdslen(zn->ele))) == -1)//首先写入key的信息
{
return -1;
}
nwritten += n;
if ((n = rdbSaveBinaryDoubleValue(rdb,zn->score)) == -1) //再写入一个double信息 这里没有进行编码 因为key的长度知道 在我们读取的时候 将会是一个固定的读取方式 先读取出string之后 紧接着会读取到一个double 如果不满足说明是文件有问题
return -1;
nwritten += n;
zn = zn->backward;
}
} else {
serverPanic("Unknown sorted set encoding");
}
} else if (o->type == OBJ_HASH) {
/* Save a hash value */
//对于hash来说 ziplist 就直接当成string写入
//hashtable 获取key value 挨着当成string写入
if (o->encoding == OBJ_ENCODING_ZIPLIST) { //ziplist
size_t l = ziplistBlobLen((unsigned char*)o->ptr);//连续内容
if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;//直接锤进去
nwritten += n;
} else if (o->encoding == OBJ_ENCODING_HT) {
dictIterator *di = dictGetIterator(o->ptr);
dictEntry *de;
if ((n = rdbSaveLen(rdb,dictSize((dict*)o->ptr))) == -1) return -1;//写入长度
nwritten += n;
while((de = dictNext(di)) != NULL) {
sds field = dictGetKey(de);//获取key
sds value = dictGetVal(de);//获取value
if ((n = rdbSaveRawString(rdb,(unsigned char*)field,
sdslen(field))) == -1) return -1;//锤入key
nwritten += n;
if ((n = rdbSaveRawString(rdb,(unsigned char*)value,
sdslen(value))) == -1) return -1; //锤入value
nwritten += n;
}
dictReleaseIterator(di);
} else {
serverPanic("Unknown hash encoding");
}
} else if (o->type == OBJ_MODULE) {
/* Save a module-specific value. */
RedisModuleIO io;
moduleValue *mv = o->ptr;
moduleType *mt = mv->type;
moduleInitIOContext(io,mt,rdb);
/* Write the "module" identifier as prefix, so that we'll be able
* to call the right module during loading. */
int retval = rdbSaveLen(rdb,mt->id);
if (retval == -1) return -1;
io.bytes += retval;
/* Then write the module-specific representation + EOF marker. */
mt->rdb_save(&io,mv->value);
retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF);
if (retval == -1) return -1;
io.bytes += retval;
if (io.ctx) {
moduleFreeContext(io.ctx);
zfree(io.ctx);
}
return io.error ? -1 : (ssize_t)io.bytes;
} else {
serverPanic("Unknown object type");
}
return nwritten;
}
对对象的存储可以总结下
首先存储的是对象的类型,对应的是SaveObjetType方法这个方法其实就是把类型编号存储文件。
然后存储Object的大小对于大小这个类型来说类型不同是不一样的。对于string来说将会存储的是一个string的长度,但是对于list来说就会是这个list的元素格式。具体会以类型和编码为主。
在存储过程中遵循如果是连续的内存块就使用string的存储方式。这样如果配置了压缩可以使用到压缩节约内存。而且当读取的时候也可以根据C语言的内存模型直接反向出原类型。
当不是连续内存块的时候就度取出关键类型。当做string来存储(除了zset的那个source)。
文件存储redisDb
现在我们知道了对于对象来说Redis的存储方式,对于我们的redisDb来说他就是一系列的key value,key是一个string value是个object所以存储来说就会很简单
在存储具体的值之前我们的db还需要一列额外的记录,比如当前是哪个db啊。我们的这个key是不是有超时信息。所以可以看下这部分是怎么实现的
/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
//对于这些值来将会被当做type来存储 在load的时候读取到这些类型将会被特殊处理
#define RDB_OPCODE_AUX 250 //设备信息 代表后面是两个string作为键值对保存的设备的一个信息
#define RDB_OPCODE_RESIZEDB 251 //db的大小信息 代表后面会保存两个长度信息 一个保存数据的dict的大小 一个保存过期键的信息
#define RDB_OPCODE_EXPIRETIME_MS 252 //过期信息毫秒 代表后面将会读取一个int64大小的数字
#define RDB_OPCODE_EXPIRETIME 253 //过期时间秒 后面会跟一个int32大小的时间
#define RDB_OPCODE_SELECTDB 254 //选择数据库
#define RDB_OPCODE_EOF 255 //数据库保存结束标记
/* Save an AUX field. */
//保存设备的一些信息 后面跟两个string load的时候之间读取
ssize_t rdbSaveAuxField(rio *rdb, void *key, size_t keylen, void *val, size_t vallen) {
ssize_t ret, len = 0;
if ((ret = rdbSaveType(rdb,RDB_OPCODE_AUX)) == -1) return -1;//标记为设备信息
len += ret;
if ((ret = rdbSaveRawString(rdb,key,keylen)) == -1) return -1;//写入key
len += ret;
if ((ret = rdbSaveRawString(rdb,val,vallen)) == -1) return -1;//写入value
len += ret;
return len;
}
//保存db信息 后面之间跟两个len信息
uint32_t db_size, expires_size;
db_size = (dictSize(db->dict) <= UINT32_MAX) ?
dictSize(db->dict) :
UINT32_MAX;
expires_size = (dictSize(db->expires) <= UINT32_MAX) ?
dictSize(db->expires) :
UINT32_MAX;
if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;//记录数据库大小信息
if (rdbSaveLen(rdb,db_size) == -1) goto werr;//dbsize
if (rdbSaveLen(rdb,expires_size) == -1) goto werr;//expires_size
//select
/* Write the SELECT DB opcode */
if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;//进入db的时候将会使用select
if (rdbSaveLen(rdb,j) == -1) goto werr;//用len直接来保存db的编号
//RDB_OPCODE_EXPIRETIME_MS
if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
int64_t t64 = (int64_t) t;
return rdbWriteRaw(rdb,&t64,8);
//RDB_OPCODE_EXPIRETIME
//没看到用 不过喃
if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME) == -1) return -1;
int32_t t32 = (int32_t) t;
return rdbWriteRaw(rdb,&t32,4);
//RDB_OPCODE_EOF
rdbSaveType(rdb,RDB_OPCODE_EOF) ;//直接就是写了个标记
对于我们的没有redisDb的保存项存储的时候可以提取出三个东西key,value,expire。
来看下存储着三个东西的流程
/* Save a key-value pair, with expire time, type, key, value.
* On error -1 is returned.
* On success if the key was actually saved 1 is returned, otherwise 0
* is returned (the key was already expired). */
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val,
long long expiretime, long long now)
{
/* Save the expire time */
if (expiretime != -1) {//如果有过期信息
/* If this key is already expired skip it */
if (expiretime < now) return 0;//如果已经过期了 肯定就不需要存储了
if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;//首先存储expire信息
if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
}
/* Save type, key, value */
if (rdbSaveObjectType(rdb,val) == -1) return -1;//value type
if (rdbSaveStringObject(rdb,key) == -1) return -1; //key
if (rdbSaveObject(rdb,val) == -1) return -1;//value
return 1;
}
对于我们的流程来说首先就是判断exipre是否存在是否过期等。然后才是存储我们的key value信息。
对于过期时间就是首先加上一个RDB_OPCODE_EXPIRETIME_MS后面之间跟上一个过期时间
对于key value 将首先存储的是value的类型信息 然后将会保存string类型的key 最后会调用我们的object保存方法保存我们的value
对于存在exipre 就会是 RDB_OPCODE_EXPIRETIME_MS exipre valuetype key value 这种格式保存的。
下面下整体的保存
/* Produces a dump of the database in RDB format sending it to the specified
* Redis I/O channel. On success C_OK is returned, otherwise C_ERR
* is returned and part of the output, or all the output, can be
* missing because of I/O errors.
*
* When the function returns C_ERR and if 'error' is not NULL, the
* integer pointed by 'error' is set to the value of errno just after the I/O
* error. */
int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
dictIterator *di = NULL;
dictEntry *de;
char magic[10];
int j;
long long now = mstime();//当前的时间
uint64_t cksum;
size_t processed = 0;
if (server.rdb_checksum)
rdb->update_cksum = rioGenericUpdateChecksum; //校验和
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);//首先写入redis版本号信息 版本使用%04将会占用4个字节
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;//redis 5个字节加上版本号四个字节为九个字节
if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr; //写入设备信息
for (j = 0; j < server.dbnum; j++) {//循环遍历所有的db
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
di = dictGetSafeIterator(d);
if (!di) return C_ERR;
/* Write the SELECT DB opcode */
if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;//进入db的时候将会使用select
if (rdbSaveLen(rdb,j) == -1) goto werr;
/* Write the RESIZE DB opcode. We trim the size to UINT32_MAX, which
* is currently the largest type we are able to represent in RDB sizes.
* However this does not limit the actual size of the DB to load since
* these sizes are just hints to resize the hash tables. */
uint32_t db_size, expires_size;
db_size = (dictSize(db->dict) <= UINT32_MAX) ?
dictSize(db->dict) :
UINT32_MAX;
expires_size = (dictSize(db->expires) <= UINT32_MAX) ?
dictSize(db->expires) :
UINT32_MAX;
if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;//记录数据库大小信息
if (rdbSaveLen(rdb,db_size) == -1) goto werr;//dbsize
if (rdbSaveLen(rdb,expires_size) == -1) goto werr;//expires_size
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
sds keystr = dictGetKey(de);
robj key, *o = dictGetVal(de);
long long expire;
initStaticStringObject(key,keystr);
expire = getExpire(db,&key);
if (rdbSaveKeyValuePair(rdb,&key,o,expire,now) == -1) goto werr;
/* When this RDB is produced as part of an AOF rewrite, move
* accumulated diff from parent to child while rewriting in
* order to have a smaller final write. */
if (flags & RDB_SAVE_AOF_PREAMBLE &&
rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
{
processed = rdb->processed_bytes;
aofReadDiffFromParent();
}
}
dictReleaseIterator(di);
}
di = NULL; /* So that we don't release it again on error. */
/* If we are storing the replication information on disk, persist
* the script cache as well: on successful PSYNC after a restart, we need
* to be able to process any EVALSHA inside the replication backlog the
* master will send us. */
if (rsi && dictSize(server.lua_scripts)) {//lua
di = dictGetIterator(server.lua_scripts);
while((de = dictNext(di)) != NULL) {
robj *body = dictGetVal(de);
if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
goto werr;
}
dictReleaseIterator(di);
}
/* EOF opcode */
if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;//保存eof
/* CRC64 checksum. It will be zero if checksum computation is disabled, the
* loading code skips the check in this case. */
cksum = rdb->cksum;
memrev64ifbe(&cksum);
if (rioWrite(rdb,&cksum,8) == 0) goto werr;//保存校验和
return C_OK;
werr:
if (error) *error = errno;
if (di) dictReleaseIterator(di);
return C_ERR;
}
对于我们的保存的流程
1.保存魔数和version 将使用raw的方式直接写入固定长度
2.保存aux信息 aux信息有很多组都是固定的格式 保存aux标记后后面跟上两个string信息
3.开始保存一个db的信息。首先使用RDB_OPCODE_SELECTDB来保存db的id,后面是直接放置的一个len的数字。然后使用RDB_OPCODE_RESIZEDB标记来保存dbsize 和expires_size 。
3具体保存每一个key value 信息。如果存在expire信息就是会RDB_OPCODE_EXPIRETIME_MS int64(time) valuetype key value 这种格式存储。没有expire的话就是直接valuetype key value 保存信息。
4.保存eof信息这个就是直接保存一个标记
5.最后保存一个8字节的校验和