简介
redisDb作为整个redis缓存存储的核心。保存着我们客户端所有的需要的缓存数据。来一起了解下。
数据结构
typedef struct redisDb {
dict *dict; /* The keyspace for this DB *///保持数据的dict
dict *expires; /* Timeout of keys with a timeout set *///保持key的过期信息
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*///一些同步的keys
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats */
} redisDb;
初始化
初始化在initServer里面
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&dbDictType,NULL);
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
server.db[j].id = j;
server.db[j].avg_ttl = 0;
}
我们也需要关注下创建各个dict的时候的dicttpe,这块对内存那块理解需要
/* Db->dict, keys are sds strings, vals are Redis objects. */
dictType dbDictType = {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
dictSdsDestructor, /* key destructor */
dictObjectDestructor /* val destructor */
};
/* Db->expires */
dictType keyptrDictType = {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
NULL, /* key destructor */
NULL /* val destructor */
};
/* Keylist hash table type has unencoded redis objects as keys and
* lists as values. It's used for blocking operations (BLPOP) and to
* map swapped keys to a list of clients waiting for this keys to be loaded. */
dictType keylistDictType = {
dictObjHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictObjKeyCompare, /* key compare */
dictObjectDestructor, /* key destructor */
dictListDestructor /* val destructor */
};
/* Generic hash table type where keys are Redis Objects, Values
* dummy pointers. */
dictType objectKeyPointerValueDictType = {
dictEncObjHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictEncObjKeyCompare, /* key compare */
dictObjectDestructor, /* key destructor */
NULL /* val destructor */
};
基本操作
查询
对于我们的db来说提供的功能就是查询,插入,覆盖等基本操作。在db层其实并不关心对于插入数据的type,在db看来每一个插入的数据都是redisobject.这样也可以统一进行管理。一般是在数据成功被查询出来之后。后面的 逻辑自己去做类型判断。
redis对每一个键提供了Expire功能。因为提供了这个过期机制。所以我们的查询肯定就是都会需要来判断这个键值对有米有过期。米有过期的数据才是有效的数据。
Expire
Expire定义在struct redisDb里面定义为dict *expires;所以可以看出来这就是一个简单的hashtable来保持了一份过期信息
setExpire
1.首先必须保证我们的dict里面设置的key存在
2.查询出原来保存expire的信息。没有的话就创建一个信息
3.设置新的expire的信息
void setExpire(client *c, redisDb *db, robj *key, long long when) {
dictEntry *kde, *de;
/* Reuse the sds from the main dict in the expire dict */
kde = dictFind(db->dict,key->ptr);//首先查询一下这个我们设置的这个key在dict里面是否存在。不存在设置个expire就感觉是搞笑卅
serverAssertWithInfo(NULL,key,kde != NULL);
de = dictAddOrFind(db->expires,dictGetKey(kde));//有的话返回原来保持的过期信息没有就创建一个新的
dictSetSignedIntegerVal(de,when);//设置val为过期时间
int writable_slave = server.masterhost && server.repl_slave_ro == 0;
if (c && writable_slave && !(c->flags & CLIENT_MASTER))
rememberSlaveKeyWithExpire(db,key);
}
getExpire
这个方法其实就是直接读取hashtable的保存的信息
long long getExpire(redisDb *db, robj *key) {
dictEntry *de;
/* No expire? return ASAP */
if (dictSize(db->expires) == 0 ||
(de = dictFind(db->expires,key->ptr)) == NULL) return -1;//在expires里面查找 key
/* The entry was found in the expire dict, this means it should also
* be present in the main dict (safety check). */
serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
return dictGetSignedIntegerVal(de);//获取integerval
}
removeExpire
删除的时候有个强制要求就是你调用这个方法代表这个过期时间必须存在
int removeExpire(redisDb *db, robj *key) {
/* An expire may only be removed if there is a corresponding entry in the
* main dict. Otherwise, the key will never be freed. */
serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);//必须存在这个key的过期信息
return dictDelete(db->expires,key->ptr) == DICT_OK;//调用删除 因为只是一个数字信息不存在什么异步的情况
}
expireIfNeeded
这个方法主要是检查过期时间,如果过期的话就删除。
如果是loading状态,就不管过不过期啥的
如果是script的访问。他是使用lua_time_start来计算是否过期。
如果是子节点,过期了也不管。只管返回是不是过期就行了。否则的话就需要做删除操作。
最后通知salve和aof文件
int expireIfNeeded(redisDb *db, robj *key) {
mstime_t when = getExpire(db,key);//获取expire
mstime_t now;
if (when < 0) return 0; /* No expire for this key */ //小于0代表没有这个key的expire信息
/* Don't expire anything while loading. It will be done later. */
if (server.loading) return 0;// server.loading 不能exipre
/* If we are in the context of a Lua script, we pretend that time is
* blocked to when the Lua script started. This way a key can expire
* only the first time it is accessed and not in the middle of the
* script execution, making propagation to slaves / AOF consistent.
* See issue #1525 on Github for more information. */
now = server.lua_caller ? server.lua_time_start : mstime();//lua将使用lua开始的时间作为now 防止用着用着被干掉了
/* If we are running in the context of a slave, return ASAP:
* the slave key expiration is controlled by the master that will
* send us synthesized DEL operations for expired keys.
*
* Still we try to return the right information to the caller,
* that is, 0 if we think the key should be still valid, 1 if
* we think the key is expired at this time. */
if (server.masterhost != NULL) return now > when; //作为是一个子节点就只做不叫不做更改 now>when 返回1 now<when 返回0
/* Return when this key has not expired */
if (now <= when) return 0; //还没有过期返回0
/* Delete the key */
server.stat_expiredkeys++; //更新stat_expiredkeys
propagateExpire(db,key,server.lazyfree_lazy_expire);//把消息发送到aof file和salve
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",key,db->id); //notify
return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) : //同步或者异步删除
dbSyncDelete(db,key);
}
expire就到这里。不过这里的话都是被动删除。当我们的内存过期的时候。我们需要把它释放掉。不释放的话,会占着内存。我们这里看到的是我们被动的查询的时候来做的删除。所以程序里面还会有一个主动删除的机制。会在后面写。
查询
lookupKey
这个方法属于最低级的查询api了,他提供查询功能。并且根据标志位来确定是不是更新LFU或者lru
需要注意的是这个方法没有判断是不是expire,所以需要配合expireIfNeeded来用的
robj *lookupKey(redisDb *db, robj *key, int flags) { //查询key
dictEntry *de = dictFind(db->dict,key->ptr);//调用dict的查询方法
if (de) {//如果存在
robj *val = dictGetVal(de);//获取value
/* Update the access time for the ageing algorithm.
* Don't do it if we have a saving child, as this will trigger
* a copy on write madness. */
if (server.rdb_child_pid == -1 && //看下有没有saving 进程和是否需要更改lru
server.aof_child_pid == -1 &&
!(flags & LOOKUP_NOTOUCH))
{
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
updateLFU(val);
} else {
val->lru = LRU_CLOCK();
}
}
return val;
} else {
return NULL;
}
}
lookupKeyReadWithFlags
这个函数因为加了expire操作,所以有点特别。因为为了主从一致性,在salve里面即使发现过期key,也是不会删除的。这个删除需要master主动来通知。所以在遇到查询的时候,如果不是master的链接,或者这个请求只是个readonly的请求。可以安全的返回个null没啥毛病。但是当是masetr的请求并且不是readonly的请求,就需要原样的返回。比如这个请求就是来删除他的喃,你给他返回不存在,不科学卅。
在最后的时候更新miss和hit
robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
robj *val;
if (expireIfNeeded(db,key) == 1) { //如果这个玩意存在 并且被删除了
/* Key expired. If we are in the context of a master, expireIfNeeded()
* returns 0 only when the key does not exist at all, so it's safe
* to return NULL ASAP. */
if (server.masterhost == NULL) return NULL;
//进入的情况就是就是这个玩意是slave 他不得去删除这个key的
/* However if we are in the context of a slave, expireIfNeeded() will
* not really try to expire the key, it only returns information
* about the "logical" status of the key: key expiring is up to the
* master in order to have a consistent view of master's data set.
*
* However, if the command caller is not the master, and as additional
* safety measure, the command invoked is a read-only command, we can
* safely return NULL here, and provide a more consistent behavior
* to clients accessign expired values in a read-only fashion, that
* will say the key as non exisitng.
*
* Notably this covers GETs when slaves are used to scale reads. */
//对于非master 或者不readonly的commnad请求可以安全的返回null
if (server.current_client &&
server.current_client != server.master &&
server.current_client->cmd &&
server.current_client->cmd->flags & CMD_READONLY) //这个commnad 不是master 并且不是readonly就给他返回null
{
return NULL;
} //是master 的非readonly 肯定就要干事情
}
val = lookupKey(db,key,flags);//查
if (val == NULL)//更新命中和未命中
server.stat_keyspace_misses++;
else
server.stat_keyspace_hits++;
return val;
}
其他几个查询函数
robj *lookupKeyRead(redisDb *db, robj *key) {//这个值是判断有米有
return lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
}
/* Lookup a key for write operations, and as a side effect, if needed, expires
* the key if its TTL is reached.
*
* Returns the linked value object if the key exists or NULL if the key
* does not exist in the specified DB. */
robj *lookupKeyWrite(redisDb *db, robj *key) {
expireIfNeeded(db,key);//write 先进行expire
return lookupKey(db,key,LOOKUP_NONE);
}
robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
robj *o = lookupKeyRead(c->db, key);
if (!o) addReply(c,reply);
return o;
}
robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
robj *o = lookupKeyWrite(c->db, key);
if (!o) addReply(c,reply);
return o;
}
添加覆盖
添加或者覆盖总的说来就是set。这些函数必须的注意的就是,调用的时候必须要保证是添加,还到底是覆盖。必须有明确原来key是否存在
void dbAdd(redisDb *db, robj *key, robj *val) {//增加
sds copy = sdsdup(key->ptr);//copy key
int retval = dictAdd(db->dict, copy, val);//dict add
serverAssertWithInfo(NULL,key,retval == DICT_OK);//程序会被终止在这个keyexist
if (val->type == OBJ_LIST) signalListAsReady(db, key);// 更新ready
if (server.cluster_enabled) slotToKeyAdd(key);
}
void dbOverwrite(redisDb *db, robj *key, robj *val) {
dictEntry *de = dictFind(db->dict,key->ptr);
serverAssertWithInfo(NULL,key,de != NULL);//必须找到
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
robj *old = dictGetVal(de);
int saved_lru = old->lru;
dictReplace(db->dict, key->ptr, val);//replace
val->lru = saved_lru;
/* LFU should be not only copied but also updated
* when a key is overwritten. */
updateLFU(val);
} else {
dictReplace(db->dict, key->ptr, val);// replace
}
}
void setKey(redisDb *db, robj *key, robj *val) {
if (lookupKeyWrite(db,key) == NULL) { //判断存在不
dbAdd(db,key,val);//直接add
} else {
dbOverwrite(db,key,val); //覆盖写
}
incrRefCount(val);//增加引用
removeExpire(db,key);//移除expire
signalModifiedKey(db,key);//通知修改
}
删除
对于后面版本的redis提供了异步删除的工作。这个异步删除和我们前面异步删除key value没什么本质上的区别
dbSyncDelete
这个玩意很简单直接干就行了
int dbSyncDelete(redisDb *db, robj *key) {
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);//直接删除删除expire
if (dictDelete(db->dict,key->ptr) == DICT_OK) {//直接删除dict
if (server.cluster_enabled) slotToKeyDel(key);
return 1;
} else {
return 0;
}
}
dbSyncDelete
这个函数就是判断下value的长度,看下是不是符合异步删除的条件这个条件的定义是#define LAZYFREE_THRESHOLD 64
int dbAsyncDelete(redisDb *db, robj *key) {
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);//删除expire
/* If the value is composed of a few allocations, to free in a lazy way
* is actually just slower... So under a certain limit we just free
* the object synchronously. */
dictEntry *de = dictUnlink(db->dict,key->ptr);//从dict里面移除 但是不释放内存
if (de) {
robj *val = dictGetVal(de);
size_t free_effort = lazyfreeGetFreeEffort(val);//判断value的长度
/* If releasing the object is too much work, do it in the background
* by adding the object to the lazy free list.
* Note that if the object is shared, to reclaim it now it is not
* possible. This rarely happens, however sometimes the implementation
* of parts of the Redis core may call incrRefCount() to protect
* objects, and then call dbDelete(). In this case we'll fall
* through and reach the dictFreeUnlinkedEntry() call, that will be
* equivalent to just calling decrRefCount(). */
if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {//这个个数还是有点多 并且没有其他引用了
atomicIncr(lazyfree_objects,1);//增加lazyfree_objects
bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);//添加异步任务
dictSetVal(db->dict,de,NULL);//设置val 为null
}
}
/* Release the key-val pair, or just the key if we set the val
* field to NULL in order to lazy free it later. */
if (de) {
dictFreeUnlinkedEntry(db->dict,de);//释放dictEntry 其中如果val没有被异步释放也会在这里释放
if (server.cluster_enabled) slotToKeyDel(key);//slot
return 1;
} else {
return 0;
}
}
dbDelete
就是调用删除
int dbDelete(redisDb *db, robj *key) {
return server.lazyfree_lazy_server_del ? dbAsyncDelete(db,key) :
dbSyncDelete(db,key);
}
清空数据库
清空数据库其实和删除没啥区别。就是分一个同步删除和异步删除。本质上就是清空hashtable
emptyDbAsync
void emptyDbAsync(redisDb *db) {
dict *oldht1 = db->dict, *oldht2 = db->expires;//取出expires 和 dict
db->dict = dictCreate(&dbDictType,NULL);// 清空的时候把原来的重新创建过
db->expires = dictCreate(&keyptrDictType,NULL);
atomicIncr(lazyfree_objects,dictSize(oldht1));
bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2);//创建任务
}
emptyDb
清空的时候-1代表全部。其他的代表具体的db
long long emptyDb(int dbnum, int flags, void(callback)(void*)) { //清空数据库
int j, async = (flags & EMPTYDB_ASYNC);
long long removed = 0;
if (dbnum < -1 || dbnum >= server.dbnum) {// -1 代表全部 其他代表 清空的id
errno = EINVAL;
return -1;
}
for (j = 0; j < server.dbnum; j++) {
if (dbnum != -1 && dbnum != j) continue;
removed += dictSize(server.db[j].dict);
if (async) {
emptyDbAsync(&server.db[j]);//异步清空
} else {
dictEmpty(server.db[j].dict,callback);//清空dict
dictEmpty(server.db[j].expires,callback);//清空expire
}
}
if (server.cluster_enabled) {
if (async) {
slotToKeyFlushAsync();
} else {
slotToKeyFlush();
}
}
if (dbnum == -1) flushSlaveKeysWithExpireList();
return removed;
}
总结
我们粗略的看了下redisDb。看了dict和expires。
我们可以发现其实在db层面上是完全没有关心存储的类型的。所以我们的对于类型方面的东西需要在上层处理。
db还有几部分没有关注到的blocking_key这些和slot这些。这里我会在后面的对应的特性去关注。现在的关注点就是把它看成一个单纯的kv缓存,米有什么主从,集群啥的。