引言
Redis作为一个开源的NoSql数据库具有很大的使用率。最近闲来读一下Redis的源代码。学习一下各路大神的编程思想。是对自己很大的帮助。阅读的时候写个文章。记录一下自己的收货,也给大家一起分享分享。
HashTable
HashTable作为一个大家常用的kv数据存储容器,大家肯定都是特别的熟悉。一般的实现思想都是对key进行hash处理。根据处理得到的的keindex找到存储的位置。一般来说如果当前keyindex上有数据,那么久把当前属于同一keyindex的数据做成一个链表保存。当达到一定的阈值那么就对HashTable进行扩容,然后可以所有的数据进行重新计算keyindex,然后存储数据。查找的时候就对key计算keyindex,在keyindex下面的链表里面找出和key一样的数据。
所有一个基本的HashTable就是拥有一个数组存放对keyindex的数据,这个数组的每个元素就存放着一个链表。但是这样一个设计一般来说是米有问题的。但是如果在数据特别特别大的时候。就会有一个问题。当我们插入一个数据的时候。我们需要对这个HashTable进行扩容。这个时候根据处理的线性来说。如果我们的数据越大,那么我们所需要的重新hash的时间就越长。这个处理的时候,会占用掉我们的线程,并且会没有特殊处理的话。该table是不能再进行操作的。这个时候就会造成性能的急剧下降。这个是对于高性能来说是不能容忍的。所以我们来看下Redis是怎么处理的。
基本数据结构
typedef struct dictEntry {
void *key; //key
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v; //value
struct dictEntry *next; //next数据
} dictEntry;
typedef struct dictType {//dict对key value处理方法
unsigned int (*hashFunction)(const void *key);//hash
void *(*keyDup)(void *privdata, const void *key);
void *(*valDup)(void *privdata, const void *obj);
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
void (*keyDestructor)(void *privdata, void *key);
void (*valDestructor)(void *privdata, void *obj);
} dictType;
typedef struct dictht {//dict的hashtable 这个hashtable就可以对应成为一个我们普通的HashTable
dictEntry **table; //存储记录的table
unsigned long size; //记录table数组的大小
unsigned long sizemask;//用于计算keyindex的具体位置
unsigned long used;//当前ht的存储数据的数量
} dictht;
typedef struct dict {
dictType *type;//key value的操作方法
void *privdata;//创建时候赋予的私有数据
dictht ht[2]; //两个HashTable 是两个
long rehashidx; //当前rehash进行的位置
int iterators; //有几个迭代器
} dict;
dictEntry 是一个链表。它存放着相同keyindex的数据
dictType 作为对HashTable key value数据的操作。
dictht 一个简单的HashTable和我们前面所讨论的基本HashTable一致
dict 使用dictht来实现Redis需要的HashTable。主要特点是拥有两个存放数据的HashTable
HashTable Add
读懂一个代码就要读懂他的流程。所以我们来从流程看代码。
我们来看一下如果我们一般的HashTable的增加一个元素的操作会是啥喃,一般来说就是下面的两步。
1.判断HashTable的存储空间是否足够。不够就扩容,不能扩容就GG。
2.计算Key的index,然后找到位置。一般来说找到一个一样的key就代表错误。正确的话就在这个位置上的列表插入我们的key vlaue数据。(一般来说如果是需要新建一个存放key value的数据结构,那就建,如果是预分配的那就用,没啥特别的)
咱们这种方法在Redis里面也是一样的实现方法。所以喃我们就来看扩容。
HashTable扩容
static unsigned long _dictNextPower(unsigned long size)
{
/*
保证一下每次返回的size是2的整数和最大size的限制
*/
unsigned long i = DICT_HT_INITIAL_SIZE;
if (size >= LONG_MAX) return LONG_MAX;
while(1) {
if (i >= size)
return i;
i *= 2;
}
}
//检查有米有在进行rehash
#define dictIsRehashing(d) ((d)->rehashidx != -1)
/* Expand or create the hash table */
int dictExpand(dict *d, unsigned long size)
{
dictht n; /* the new hash table */
unsigned long realsize = _dictNextPower(size);//计算出一个2标准的size
/* the size is invalid if it is smaller than the number of
* elements already inside the hash table */
if (dictIsRehashing(d) || d->ht[0].used > size)//新的size要比原来的已经用的多
return DICT_ERR;
/* Rehashing to the same table size is not useful. */
if (realsize == d->ht[0].size) return DICT_ERR;//没变化的size就没啥改变头
/* Allocate the new hash table and initialize all pointers to NULL */
n.size = realsize; //把这个新的大小的HashTable初始化
n.sizemask = realsize-1;
n.table = zcalloc(realsize*sizeof(dictEntry*));
n.used = 0;
/* Is this the first initialization? If so it's not really a rehashing
* we just set the first hash table so that it can accept keys. */
if (d->ht[0].table == NULL) {//这种是初始情况下当前table都还没数据
d->ht[0] = n;
return DICT_OK;
}
/* Prepare a second hash table for incremental rehashing */
d->ht[1] = n;//有数据的话就把它给dict的第二个HashTable等于就是把第二个HashTable初始化下
d->rehashidx = 0;//标记当前rehashidx
return DICT_OK;
}
/* Performs N steps of incremental rehashing. Returns 1 if there are still
* keys to move from the old to the new hash table, otherwise 0 is returned.
*
* Note that a rehashing step consists in moving a bucket (that may have more
* than one key as we use chaining) from the old to the new hash table, however
* since part of the hash table may be composed of empty spaces, it is not
* guaranteed that this function will rehash even a single bucket, since it
* will visit at max N*10 empty buckets in total, otherwise the amount of
* work it does would be unbound and the function may block for a long time. */
int dictRehash(dict *d, int n) {
int empty_visits = n*10; /* Max number of empty buckets to visit. *///代表发现空节点的次数
if (!dictIsRehashing(d)) return 0;//当前没有进行rehash操作
while(n-- && d->ht[0].used != 0) {//判断还有没有移动次数和还有没有数据
dictEntry *de, *nextde;
/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
assert(d->ht[0].size > (unsigned long)d->rehashidx);//小了就是内存错误了
while(d->ht[0].table[d->rehashidx] == NULL) {//这是一个空节点
d->rehashidx++;//往前走一个index
if (--empty_visits == 0) return 1;//空节点走完了停止rehash
}
de = d->ht[0].table[d->rehashidx];//拿出这个节点的list
/* Move all the keys in this bucket from the old to the new hash HT */
while(de) {//找到这个节点的数据取出来放入到第二个HashTable去 然后指向下一个节点
unsigned int h;
nextde = de->next;
/* Get the index in the new hash table */
h = dictHashKey(d, de->key) & d->ht[1].sizemask;
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
d->ht[0].table[d->rehashidx] = NULL;//把当前节点设置我空
d->rehashidx++;//后移idx
}
/* Check if we already rehashed the whole table... */
if (d->ht[0].used == 0) {//ht0里面使用的已经没了
zfree(d->ht[0].table);//进行释放
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;//标记rehash完成
return 0;
}
//返回1 代表rehash还没有完成 0 rehash完成了
/* More to rehash... */
return 1;
}
static void _dictRehashStep(dict *d) {//进行一步rehash操作
if (d->iterators == 0) dictRehash(d,1);
}
我们来看expand代码。他的操作很简单。如果当前的dict米有数据,就创建一个跟size相关的格式化大小的dictht放入ht0,如果是有数据。就创建来赋值给ht0。并且标记idx为0
rehash方法。增加了一个最大更新hash槽数量。更新每个hash槽的操作都是找出这个槽,遍历这个槽的list然后把取出来的数据放到新的hashtable。根据rehashidx来知道开始的位置。
根据这两个代码喃就发现expand就完成个空间分配。rehash完成转移操作(从0到1如果转移完成就又从1到0这个时候就rehash完成了)。rehash传入最大的转移插槽数。可以保证及时的性能。
然后各处塞满了_dictRehashStep 当然你也可以手动调用rehash。
HashTable 插入
static int _dictExpandIfNeeded(dict *d)//判断是不是要扩容
{
/* Incremental rehashing already in progress. Return. */
if (dictIsRehashing(d)) return DICT_OK;
/* If the hash table is empty expand it to the initial size. */
if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
/* If we reached the 1:1 ratio, and we are allowed to resize the hash
* table (global setting) or we should avoid it but the ratio between
* elements/buckets is over the "safe" threshold, we resize doubling
* the number of buckets. */
if (d->ht[0].used >= d->ht[0].size &&
(dict_can_resize ||
d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
{
return dictExpand(d, d->ht[0].used*2);
}
return DICT_OK;
}
/* Returns the index of a free slot that can be populated with
* a hash entry for the given 'key'.
* If the key already exists, -1 is returned.
*
* Note that if we are in the process of rehashing the hash table, the
* index is always returned in the context of the second (new) hash table. */
static int _dictKeyIndex(dict *d, const void *key)//查找keyindex
{
unsigned int h, idx, table;
dictEntry *he;
/* Expand the hash table if needed */
if (_dictExpandIfNeeded(d) == DICT_ERR)//判断扩容不
return -1;
/* Compute the key hash value */
h = dictHashKey(d, key);//算出keyindex
for (table = 0; table <= 1; table++) {//看下这个插槽有没有
idx = h & d->ht[table].sizemask;
/* Search if this slot does not already contain the given key */
he = d->ht[table].table[idx];//找到插槽
while(he) {
if (key==he->key || dictCompareKeys(d, key, he->key))//判断有米有一样的key
return -1;
he = he->next;
}
if (!dictIsRehashing(d)) break;//只有在rehash的情况下ht1才会有数据
}
return idx;
}
dictEntry *dictAddRaw(dict *d, void *key)
{
int index;
dictEntry *entry;
dictht *ht;
if (dictIsRehashing(d)) _dictRehashStep(d);
/* Get the index of the new element, or -1 if
* the element already exists. */
if ((index = _dictKeyIndex(d, key)) == -1)//找出keyindex
return NULL;
/* Allocate the memory and store the new entry.
* Insert the element in top, with the assumption that in a database
* system it is more likely that recently added entries are accessed
* more frequently. */
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];//如果是rehash状态下就插入ht1否则ht0
entry = zmalloc(sizeof(*entry));//分配个保存空间
entry->next = ht->table[index];//维持list
ht->table[index] = entry;
ht->used++;//增加使用数目
/* Set the hash entry fields. */
dictSetKey(d, entry, key);
return entry;
}
int dictAdd(dict *d, void *key, void *val)
{
dictEntry *entry = dictAddRaw(d,key);//分配给保存空间
if (!entry) return DICT_ERR;
dictSetVal(d, entry, val);//保存数据
return DICT_OK;
}
插入操作是当前是rehash状态下的话就会被插入到ht1的那个HashTable(因为插入0的话0的有些插槽的数据已经被rehash过了<idx > keyindex>)其他操作都是常规操作。
后面的删除修改就不在叙述了。
总结
Redis实现了一个常规的HashTable即dictht 他拥有散列和保存功能。dict 使用两个dictht 他们的作用分别是
ht0:在没有rehash的时候,他作为唯一保存数据的位置,所有的插入,删除,查找等操作都只在他上面运行。在进行rehash的时候,他是作为一个元数据往ht1里面写数据。没有插入功能,但是具有删除和查找功能。
ht1:在没有rehash的时候他是个NULL所有啥都不会干。在rehash的时候,接收ht0来的元数据。这时候插入功能只会在ht1上。查找和删除功能也有。当rehash完成的时候,他会被赋值给ht0然后自己被情况。
Redis通过两个dictht来实现一次不全部完成rehash,但是也保证一个HashTable的全部功能。通过穿插的操作来碎片化rehash的时间,保证及时性。总体来说还是一个很值得学习的处理方式。