参考链接:
https://blog.csdn.net/harleylau/article/details/80534159
http://zhangtielei.com/posts/blog-redis-quicklist.html
https://blog.csdn.net/qq_30085733/article/details/79914295
http://czrzchao.com/redisSourceQuicklist
https://blog.csdn.net/qq_30085733/article/details/79914295
在redis3.2版本之前,redis的列表对象的底层数据结构是ziplist和linkedlist。当列表对象中元素的长度比较小或者数量比较少的时候,采用ziplist来存储,当列表对象中元素的长度比较大或者数量比较多的时候,则会转而使用双向列表linkedlist来存储。但是最新的redis版本中,redis的list的底层数据结构统一为quicklist,quicklist是ziplist和linkedlist的结合,其本质上是一个双向链表,链表中的每一个元素是ziplist。其数据结构如下:
typedef struct quicklist {
quicklistNode *head; // 指向quicklist的头部
quicklistNode *tail; // 指向quicklist的尾部
unsigned long count; // 列表中所有数据项的个数总和
unsigned int len; // quicklist节点的个数,即ziplist的个数
int fill : 16; // ziplist大小限定,由list-max-ziplist-size给定
unsigned int compress : 16; // 节点压缩深度设置(首尾两端不被压缩节点的个数),由list-compress-depth给定,该值为0表示不允许对这个quicklist进行压缩
} quicklist;
其中,len表示quicklist元素的个数,即quicklist包含多少个ziplist节点(quicklist的每一个节点都是ziplist)。由于每个ziplist又是由多个数据节点组成的,所以count代表quicklist中一共有多少个数据节点。fill和compress决定了每个ziplist的属性。
redis会利用LZF压缩算法对quicklist的节点进行压缩操作,原因在于:list的设计目的是能够存放很长的数据列表,当列表很长时,必然会占用很高的内存空间,且list中最容易访问的是两端的数据,中间的数据访问率较低,如果将不怎么被访问的数据进行压缩,只在需要时才进行解压缩来访问原始数据,那么就可以在尽量少影响业务流畅度的基础上减少数据存储的空间。基于此,redis提出了compress depth这个概念--保持首尾compress depth个quickliostNode不进行压缩,对中间的quicklistNode进行压缩处理。
compress:这个字段是修饰整个quicklist的,代表首尾两端不被压缩的节点的个数。节点压缩深度设置,存放
list-compress-depth参数的值。
* 0 特殊值,表示不压缩
* 1 表示quicklist两端各有一个节点不压缩,中间的节点压缩
* 2 表示quicklist两端各有两个节点不压缩,中间的节点压缩
* 3 表示quicklist两端各有三个节点不压缩,中间的节点压缩
* 以此类推。
compress最大值:
#define COMPRESS_MAX (1 << 16)
由于quicklist结构包含了压缩表和链表,那么每个quicklistNode的大小就是一个需要仔细考量的点。如果单个quicklistNode存储的数据太多,就会影响插入效率;但是如果单个quicklistNode太小,就会变得跟链表一样造成空间浪费。
quicklist通过fill对单个quicklistNode的大小进行限制:
fill:ziplist大小设置,表示每个quicklist节点的元素大小限制,存放 list-max-ziplist-size参数的值。
fill可正可负,如果参数为正,表示按照数据项个数来限定每个节点中的元素个数,比如3代表每个节点中存放的元
素个数不能超过3(最大为32768个);反之,如果参数为负,表示按照字节数来限定每个节点中的元素个数,它只能
取-1~-5这五个数,其含义如下:
* -1 每个节点的ziplist字节大小不能超过4kb
* -2 每个节点的ziplist字节大小不能超过8kb(-2是Redis给出的默认值)
* -3 每个节点的ziplist字节大小不能超过16kb
* -4 每个节点的ziplist字节大小不能超过32kb
* -5 每个节点的ziplist字节大小不能超过64kb
-1~-5对应下面这个数组,例如:-1对应4096,-5对应65536.
/* Optimization levels for size-based filling */
static const size_t optimization_level[] = {4096, 8192, 16384, 32768, 65536};
fill为正数时的最大值:
#define FILL_MAX (1 << 15)
quicklist节点有两种状态,RAW状态和LZF状态。RAW状态就是原始的ziplist结构(解压缩状态);LZF状态则是利用LZF压缩算法对quicklist的节点进行压缩操作,其结构为quicklistLZF。
typedef struct quicklistNode {
struct quicklistNode *prev; // 指向上一个ziplist节点
struct quicklistNode *next; // 指向下一个ziplist节点
unsigned char *zl; // 数据指针,如果没有被压缩,就指向ziplist结构,反之指向quicklistLZF结构
unsigned int sz; // 表示指向ziplist结构的总长度(内存占用长度)
unsigned int count : 16; // 表示ziplist中的数据项个数
unsigned int encoding : 2; // 编码方式,1--ziplist,2--quicklistLZF
unsigned int container : 2; // 预留字段,存放数据的方式,1--NONE,2--ziplist
unsigned int recompress : 1; // 当我们使用类似lindex这样的命令查看了某一项本来压缩的数据时,需要把数据暂时解压,这时就设置recompress=1做一个标记,等有机会再把数据重新压缩。
unsigned int attempted_compress : 1; // 测试相关
unsigned int extra : 10; // 扩展字段,暂时没用
} quicklistNode;
typedef struct quicklistLZF {
unsigned int sz; /* LZF size in bytes*/
char compressed[];
} quicklistLZF;
quciklist的迭代器结构,指向迭代器节点元素(当前节点表示quicklist中的某个ziplist中的某个ziplistNode,也就是quicklistEntry最终指向的是这个ziplisyEntry,所以quicklist的迭代,是从当前entry依次往后遍历所有ziplistNode,而不是单单遍历每个quicklistNode):
// quicklist的迭代器结构
typedef struct quicklistIter {
const quicklist *quicklist; // 指向所在quicklist的指针
quicklistNode *current; // 指向当前节点的指针
unsigned char *zi; // 指向当前节点(ziplist中的·一个元素)
long offset; // 当前ziplist中的偏移地址
int direction; // 迭代器的方向
} quicklistIter;
// 表示quicklist节点中ziplist里的一个节点结构
typedef struct quicklistEntry {
const quicklist *quicklist; // 指向所在quicklist的指针
quicklistNode *node; // 指向当前节点的指针
unsigned char *zi; // 指向当前节点(ziplist中的·一个元素)
unsigned char *value; // 当前指向的ziplist中的节点的字符串值
long long longval; // 当前指向的ziplist中的节点的整型值
unsigned int sz; // 当前指向的ziplist中的节点的字节大小
int offset; // 当前指向的ziplist中的节点相对于ziplist的偏移量
} quicklistEntry;
quicklist结构如图所示:
quicklist的主要api:
- 创建quicklist:quicklist *quicklistCreate(void)
- quicklist节点创建:REDIS_STATIC quicklistNode *quicklistCreateNode(void)
- 删除quicklist:void quicklistRelease(quicklist *quicklist)
- quicklist的insert操作:insert操作类似push操作,不过是在某个迭代器指向的当前ziplistNode节点前/后insert一个新的ziplistNode元素。
void quicklistInsertAfter(quicklist *quicklist, quicklistEntry *entry, void *value, const size_t sz)
void quicklistInsertBefore(quicklist *quicklist, quicklistEntry *entry, void *value, const size_t sz) - quicklist的push操作:在quicklist的头/尾节点(ziplist)中push一个ziplistNode元素。
int quicklistPushHead(quicklist *quicklist, void *value, size_t sz)
int quicklistPushTail(quicklist *quicklist, void *value, size_t sz) - quicklist的pop操作:int quicklistPop(quicklist *quicklist, int where, unsigned char **data, unsigned int *sz, long long *slong)
- quicklist节点删除:void quicklistDelEntry(quicklistIter *iter, quicklistEntry *entry)
- quicklist压缩:quicklistCompress(_ql, _node)
quicklist重要接口实现讲解:
- quicklist压缩:quicklistCompress(_ql, _node)
/*
如果recompress=1,表示这个节点之前因为index等操作被“暂时”解压缩以查看原始数据,
此时可以对这个节点直接执行压缩操作,将这个节点重新压缩(不需要判断quicklist的
compress字段是否为0,因为是暂时解压缩的,所以原来的quicklist的这个节点一定是支持压缩的。)
*/
#define quicklistCompress(_ql, _node) \
do { \
if ((_node)->recompress) \
quicklistCompressNode((_node)); \
else \
__quicklistCompress((_ql), (_node)); \
} while (0)
内部接口:
/*
在弄懂这个函数之前,最重要的是明白quicklist->compress字段的意思,这个字段是修饰整个
quicklist的,代表首尾两端不被压缩的节点的个数,所以在执行compress之前,需要先判断
要被压缩的目标节点是否可以被压缩。
*/
//该函数应用很广,delete或者insert新元素都需要调用这个函数来进行quicklist属性适配
REDIS_STATIC void __quicklistCompress(const quicklist *quicklist,
quicklistNode *node) {
/* If length is less than our compress depth (from both sides),
* we can't compress anything. */
if (!quicklistAllowsCompression(quicklist) ||
quicklist->len < (unsigned int)(quicklist->compress * 2))
return;
/* Iterate until we reach compress depth for both sides of the list.a
* Note: because we do length checks at the *top* of this function,
* we can skip explicit null checks below. Everything exists.
*/
quicklistNode *forward = quicklist->head;
quicklistNode *reverse = quicklist->tail;
int depth = 0;
int in_depth = 0;
while (depth++ < quicklist->compress) {
/*
从quicklist的头尾两边分别遍历,因为头部的compress个节点和尾部的compress个
节点都一定是非压缩的原始状态,为了确保满足compress字段要求,首先将这几个节点
强制再解压缩回原始状态。之所以进行这一步操作的原因是因为,inset或者delete操作
可能会将原来不处于compress depth中的元素进入compress depth,所以要将这些
元素变为正常的状态
*/
quicklistDecompressNode(forward);
quicklistDecompressNode(reverse);
/*
目标节点在头部或尾部的那compress个非压缩的节点中,不应该压缩这个节点,但是可能会压缩最靠
近非压缩深度外的两个节点(???),所以只是将in_depth标志置为1,表示目标节点在非压缩的
深度中(深度指不被压缩的数目),但不会return。
*/
if (forward == node || reverse == node)
in_depth = 1;
//quicklist的长度不及2*compress个,不进行下一步的压缩操作(异常情况,违背入口的判断条件)
if (forward == reverse)
return;
forward = forward->next;
reverse = reverse->prev;
}
//目标节点不在非压缩深度中(满足压缩条件),直接压缩
if (!in_depth)
quicklistCompressNode(node);
//为啥压缩深度外的那两个?不太清楚??不应该只压缩一个满足情况的node就好了么??
if (depth > 2) {
/* At this point, forward and reverse are one node beyond depth */
quicklistCompressNode(forward);
quicklistCompressNode(reverse);
}
}
/* If we previously used quicklistDecompressNodeForUse(), just recompress. */
/* 之前如果为了某个需要将某个原始节点暂时解压缩了,此时要将该节点恢复为压缩状态 */
#define quicklistRecompressOnly(_ql, _node) \
do { \
if ((_node)->recompress) \
quicklistCompressNode((_node)); \
} while (0)
- quicklist insert操作:
quicklist还能在指定的位置插入数据,quicklistInsertAfter和quicklistInsertBefore就是分别在指定位置后面和前面插入数据项。当然,和在头尾节点插入一样, 任意位置插入也是需要判断当前插入节点是否能够放得下当前的元素的,这边的情况会比头尾节点更为复杂,比如在当前节点放不下的时候,还需要检查一下旁边的节点是否能够放下这个数据,能够放下的话可以放置在旁边的节点上,如果也不行的话,就是需要新建一个ziplist节点。
void quicklistInsertBefore(quicklist *quicklist, quicklistEntry *entry,
void *value, const size_t sz) {
_quicklistInsert(quicklist, entry, value, sz, 0);
}
内部接口:
/* Insert a new entry before or after existing entry 'entry'.
*
* If after==1, the new value is inserted after 'entry', otherwise
* the new value is inserted before 'entry'. */
REDIS_STATIC void _quicklistInsert(quicklist *quicklist, quicklistEntry *entry,
void *value, const size_t sz, int after) {
int full = 0, at_tail = 0, at_head = 0, full_next = 0, full_prev = 0;
int fill = quicklist->fill;
quicklistNode *node = entry->node;
quicklistNode *new_node = NULL;
if (!node) {
/* we have no reference node, so let's create only node in the list */
/* 这种情况下相当于quicklist中就没有任何一个quicklistNode */
D("No node given!");
new_node = quicklistCreateNode();
new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
__quicklistInsertNode(quicklist, NULL, new_node, after);
new_node->count++;
quicklist->count++;
return;
}
/* Populate accounting flags for easier boolean checks later */
//查看当前quicklistNode对应的ziplist中空间是否足够插入一个新元素
if (!_quicklistNodeAllowInsert(node, fill, sz)) {
D("Current node is full with count %d with requested fill %lu",
node->count, fill);
full = 1;
}
/* 如果要在当前Node的尾部后面插入新元素,考虑可否插入后一个ziplist中 */
if (after && (entry->offset == node->count)) {
D("At Tail of current ziplist");
at_tail = 1;
if (!_quicklistNodeAllowInsert(node->next, fill, sz)) {
D("Next node is full too.");
full_next = 1;
}
}
/* 如果要在当前Node的头部前面插入新元素,考虑可否插入前一个ziplist中 */
if (!after && (entry->offset == 0)) {
D("At Head");
at_head = 1;
if (!_quicklistNodeAllowInsert(node->prev, fill, sz)) {
D("Prev node is full too.");
full_prev = 1;
}
}
/* Now determine where and how to insert the new element */
/*
!full的情况对quicklist的compress depth没有影响(没有插入新的quicklistNode),
所以只需要对要插入的那个quicklistNode进行:解压缩(可能需要)-> 插入新元素 -> 重新压缩
(可能需要)就可以了
*/
if (!full && after) {
D("Not full, inserting after current position.");
quicklistDecompressNodeForUse(node);//如果之前是压缩状态,需要先解压缩,insert完毕后,再重新压缩
unsigned char *next = ziplistNext(node->zl, entry->zi);
if (next == NULL) {
node->zl = ziplistPush(node->zl, value, sz, ZIPLIST_TAIL);
} else {
node->zl = ziplistInsert(node->zl, next, value, sz);
}
node->count++;
quicklistNodeUpdateSz(node);
quicklistRecompressOnly(quicklist, node);//由于插入元素前可能解压缩了,所以可能需要重新回复压缩状态
} else if (!full && !after) {
D("Not full, inserting before current position.");
quicklistDecompressNodeForUse(node);
node->zl = ziplistInsert(node->zl, entry->zi, value, sz);
node->count++;
quicklistNodeUpdateSz(node);
quicklistRecompressOnly(quicklist, node);
} else if (full && at_tail && node->next && !full_next && after) {
/* If we are: at tail, next has free space, and inserting after:
* - insert entry at head of next node. */
D("Full and tail, but next isn't full; inserting next node head");
new_node = node->next;
quicklistDecompressNodeForUse(new_node);
new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_HEAD);
new_node->count++;
quicklistNodeUpdateSz(new_node);
quicklistRecompressOnly(quicklist, new_node);
} else if (full && at_head && node->prev && !full_prev && !after) {
/* If we are: at head, previous has free space, and inserting before:
* - insert entry at tail of previous node. */
D("Full and head, but prev isn't full, inserting prev node tail");
new_node = node->prev;
quicklistDecompressNodeForUse(new_node);
new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_TAIL);
new_node->count++;
quicklistNodeUpdateSz(new_node);
quicklistRecompressOnly(quicklist, new_node);
} else if (full && ((at_tail && node->next && full_next && after) ||
(at_head && node->prev && full_prev && !after))) {
/* If we are: full, and our prev/next is full, then:
* - create new node and attach to quicklist */
/*
这种情况表示cur、pre、next ziplist都满了,需要重新构建一个新的quicklistNode,
在这个quicklistNode中插入目标元素。因为插入了新的quicklistNode,可能会影响
到compress depth,所以需要重新进行quicklistCompress处理(幸运的是,
__quicklistInsertNode会自动进行这一步处理)。
*/
D("\tprovisioning new node...");
new_node = quicklistCreateNode();
new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
new_node->count++;
quicklistNodeUpdateSz(new_node);
__quicklistInsertNode(quicklist, node, new_node, after);
} else if (full) {
/* else, node is full we need to split it. */
/* covers both after and !after cases */
/*
这种情况对应cur ziplist满了,但是想要插的元素不在ziplist头部的前面或者尾部的后面,
无法将元素插入pre或next ziplist。这种情况下,需要将cur ziplist在插入的位置
split成两个ziplist,再将这两个新的ziplist插入quicklist中。因为插入了新的
quicklistNode,所以需要重新进行quicklistCompress处理。
*/
D("\tsplitting node...");
quicklistDecompressNodeForUse(node);
//split操作会将切割后的两个ziplist分别存放在new_node和node中
/* If 'after'==1, returned node will have elements _after_ 'offset'.
* The returned node will have elements [OFFSET+1, END].
* The input node keeps elements [0, OFFSET].
*
* If 'after'==0, returned node will keep elements up to and including 'offset'.
* The returned node will have elements [0, OFFSET].
* The input node keeps elements [OFFSET+1, END].
*/
//new_node可能插入node的前方或后方,由after决定
//新的元素可能插入new_old的头部或者尾部,也由after决定
new_node = _quicklistSplitNode(node, entry->offset, after);
new_node->zl = ziplistPush(new_node->zl, value, sz,
after ? ZIPLIST_HEAD : ZIPLIST_TAIL);
new_node->count++;
quicklistNodeUpdateSz(new_node);
__quicklistInsertNode(quicklist, node, new_node, after);
/*
这一步的merge是为了节省空间,将多个ziplist合并成满足fill要求的ziplist,
减少ziplist头部开销。这一步试图按以下顺序合并:
- (node->prev->prev, node->prev)
- (node->next, node->next->next)
- (node->prev, node)
- (node, node->next)
*/
_quicklistMergeNodes(quicklist, node);
}
quicklist->count++;
}
/* Insert 'new_node' after 'old_node' if 'after' is 1.
* Insert 'new_node' before 'old_node' if 'after' is 0.
* Note: 'new_node' is *always* uncompressed, so if we assign it to
* head or tail, we do not need to uncompress it. */
REDIS_STATIC void __quicklistInsertNode(quicklist *quicklist,
quicklistNode *old_node,
quicklistNode *new_node, int after) {
if (after) {
new_node->prev = old_node;
if (old_node) {
new_node->next = old_node->next;
if (old_node->next)
old_node->next->prev = new_node;
old_node->next = new_node;
}
if (quicklist->tail == old_node)
quicklist->tail = new_node;
} else {
new_node->next = old_node;
if (old_node) {
new_node->prev = old_node->prev;
if (old_node->prev)
old_node->prev->next = new_node;
old_node->prev = new_node;
}
if (quicklist->head == old_node)
quicklist->head = new_node;
}
/* If this insert creates the only element so far, initialize head/tail. */
if (quicklist->len == 0) {
quicklist->head = quicklist->tail = new_node;
}
/*
无论是在old元素的before还是after插入一个新的元素,都会更改old元素的位置,为了
适应quicklist->compress字段设置的非压缩深度,需要调用quicklistCompress函数
重新更改old元素的压缩状态。
*/
if (old_node)
quicklistCompress(quicklist, old_node);
quicklist->len++;
}
- quicklist的push操作:
quicklist的push操作其实就是在双向列表的头节点或尾节点上插入一个新的元素。从上面我们知道,quicklist的每个节点都是一个ziplist,所以这个push操作就涉及到一个问题,当前节点的ziplist是否能够放进新元素。
———————————————————————————————————
1.如果ziplist能够放入新元素,即大小没有超过限制(list-max-ziplist-size),那么直接调用ziplistPush函数压入
2.如果ziplist不能放入新元素,则新建一个quicklist节点,即新的ziplist,新的数据项会被插入到新的ziplist,新的quicklist节点插入到原有的quicklist上
———————————————————————————————————
/* Add new entry to head node of quicklist.
*
* Returns 0 if used existing head.
* Returns 1 if new head created. */
//PushHead操作是在quicklist的某个节点的ziplist中push值为ziplistNode的元素
int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
quicklistNode *orig_head = quicklist->head;
if (likely(
_quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
quicklist->head->zl =
ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);
quicklistNodeUpdateSz(quicklist->head);
} else {//无法在原有quicklistNode中push元素,则新建一个只有这个元素的quicklist
quicklistNode *node = quicklistCreateNode();
node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
quicklistNodeUpdateSz(node);
_quicklistInsertNodeBefore(quicklist, quicklist->head, node);
}
quicklist->count++;
quicklist->head->count++;
return (orig_head != quicklist->head);
}
//判断当前ziplist节点是否能插入的函数:
REDIS_STATIC int _quicklistNodeAllowInsert(const quicklistNode *node,
const int fill, const size_t sz) {
if (unlikely(!node))
return 0;
int ziplist_overhead;
/* size of previous offset */
if (sz < 254) //小于254时后一个节点的pre只有1字节,否则为5字节
ziplist_overhead = 1;
else
ziplist_overhead = 5;
/* size of forward offset */
if (sz < 64) // 小于64字节当前节点的encoding为1
ziplist_overhead += 1;
else if (likely(sz < 16384)) // 小于16384 encoding为2字节
ziplist_overhead += 2;
else // encoding为5字节
ziplist_overhead += 5;
/* new_sz overestimates if 'sz' encodes to an integer type */
unsigned int new_sz = node->sz + sz + ziplist_overhead; // 忽略了连锁更新的情况
if (likely(_quicklistNodeSizeMeetsOptimizationRequirement(new_sz, fill)))
return 1; // 校验fill为负数是否超过单存储限制
else if (!sizeMeetsSafetyLimit(new_sz)) // 校验单个节点是否超过8kb,主要防止fill为正数时单个节点内存过大
return 0;
else if ((int)node->count < fill) // fill为正数是否超过存储限制,因为打算插入一个新元素,所以只能用<进行判断,不能用<=进行判断。
return 1;
else
return 0;
}
- quicklist的pop操作:
外部接口:
int quicklistPop(quicklist *quicklist, int where, unsigned char **data,
unsigned int *sz, long long *slong) {
unsigned char *vstr;
unsigned int vlen;
long long vlong;
if (quicklist->count == 0)
return 0;
int ret = quicklistPopCustom(quicklist, where, &vstr, &vlen, &vlong,
_quicklistSaver);
if (data)
*data = vstr;
if (slong)
*slong = vlong;
if (sz)
*sz = vlen;
return ret;
}
内部接口:
/* pop from quicklist and return result in 'data' ptr. Value of 'data'
* is the return value of 'saver' function pointer if the data is NOT a number.
*
* If the quicklist element is a long long, then the return value is returned in
* 'sval'.
*
* Return value of 0 means no elements available.
* Return value of 1 means check 'data' and 'sval' for values.
* If 'data' is set, use 'data' and 'sz'. Otherwise, use 'sval'. */
int quicklistPopCustom(quicklist *quicklist, int where, unsigned char **data,
unsigned int *sz, long long *sval,
void *(*saver)(unsigned char *data, unsigned int sz)) {
unsigned char *p;
unsigned char *vstr;
unsigned int vlen;
long long vlong;
int pos = (where == QUICKLIST_HEAD) ? 0 : -1;
if (quicklist->count == 0)
return 0;
if (data)
*data = NULL;
if (sz)
*sz = 0;
if (sval)
*sval = -123456789;
quicklistNode *node;
if (where == QUICKLIST_HEAD && quicklist->head) {
node = quicklist->head;
} else if (where == QUICKLIST_TAIL && quicklist->tail) {
node = quicklist->tail;
} else {
return 0;
}
p = ziplistIndex(node->zl, pos);
if (ziplistGet(p, &vstr, &vlen, &vlong)) {
if (vstr) {
if (data)
*data = saver(vstr, vlen);
if (sz)
*sz = vlen;
} else {
if (data)
*data = NULL;
if (sval)
*sval = vlong;
}
quicklistDelIndex(quicklist, node, &p);
return 1;
}
return 0;
}
- quicklist节点删除:
/* Delete one element represented by 'entry'
*
* 'entry' stores enough metadata to delete the proper position in
* the correct ziplist in the correct quicklist node. */
void quicklistDelEntry(quicklistIter *iter, quicklistEntry *entry) {
quicklistNode *prev = entry->node->prev;
quicklistNode *next = entry->node->next;
int deleted_node = quicklistDelIndex((quicklist *)entry->quicklist,
entry->node, &entry->zi);
/* after delete, the zi is now invalid for any future usage. */
iter->zi = NULL;
/* If current node is deleted, we must update iterator node and offset. */
if (deleted_node) {
if (iter->direction == AL_START_HEAD) {
iter->current = next;
iter->offset = 0;
} else if (iter->direction == AL_START_TAIL) {
iter->current = prev;
iter->offset = -1;
}
}
/* else if (!deleted_node), no changes needed.
* we already reset iter->zi above, and the existing iter->offset
* doesn't move again because:
* - [1, 2, 3] => delete offset 1 => [1, 3]: next element still offset 1
* - [1, 2, 3] => delete offset 0 => [2, 3]: next element still offset 0
* if we deleted the last element at offet N and now
* length of this ziplist is N-1, the next call into
* quicklistNext() will jump to the next node. */
}
/* Delete one entry from list given the node for the entry and a pointer
* to the entry in the node.
*
* Note: quicklistDelIndex() *requires* uncompressed nodes because you
* already had to get *p from an uncompressed node somewhere.
*
* Returns 1 if the entire node was deleted, 0 if node still exists.
* Also updates in/out param 'p' with the next offset in the ziplist. */
REDIS_STATIC int quicklistDelIndex(quicklist *quicklist, quicklistNode *node,
unsigned char **p) {
int gone = 0;
node->zl = ziplistDelete(node->zl, p);
node->count--;
if (node->count == 0) {
gone = 1;
__quicklistDelNode(quicklist, node);
} else {
quicklistNodeUpdateSz(node);
}
quicklist->count--;
/* If we deleted the node, the original node is no longer valid */
return gone ? 1 : 0;
}
REDIS_STATIC void __quicklistDelNode(quicklist *quicklist,
quicklistNode *node) {
if (node->next)
node->next->prev = node->prev;
if (node->prev)
node->prev->next = node->next;
if (node == quicklist->tail) {
quicklist->tail = node->prev;
}
if (node == quicklist->head) {
quicklist->head = node->next;
}
/* If we deleted a node within our compress depth, we
* now have compressed nodes needing to be decompressed. */
__quicklistCompress(quicklist, NULL);
quicklist->count -= node->count;
zfree(node->zl);
zfree(node);
quicklist->len--;
}
quicklist迭代器相关操作:
类似stl的迭代器,在执行insert操作时,不能使用原来的迭代器,除非重新建立新的迭代器。
当使用quicklistDelEntry()来删除元素时,迭代器有时会失效。
迭代器失效的原因都是因为底层内存重新分配,迭代器原本指向的地址变为无效地址。
- 初始化迭代器:
/* Returns a quicklist iterator 'iter'. After the initialization every
* call to quicklistNext() will return the next element of the quicklist. */
quicklistIter *quicklistGetIterator(const quicklist *quicklist, int direction) {
quicklistIter *iter;
iter = zmalloc(sizeof(*iter));
if (direction == AL_START_HEAD) {
iter->current = quicklist->head;
iter->offset = 0;
} else if (direction == AL_START_TAIL) {
iter->current = quicklist->tail;
iter->offset = -1;
}
iter->direction = direction;
iter->quicklist = quicklist;
iter->zi = NULL;
return iter;
}
- 获取idx处的迭代器
/* Initialize an iterator at a specific offset 'idx' and make the iterator
* return nodes in 'direction' direction. */
quicklistIter *quicklistGetIteratorAtIdx(const quicklist *quicklist,
const int direction,
const long long idx) {
quicklistEntry entry;
if (quicklistIndex(quicklist, idx, &entry)) {
quicklistIter *base = quicklistGetIterator(quicklist, direction);
base->zi = NULL;//?这个为什么不用entry->zi,同next结合使用
base->current = entry.node;
base->offset = entry.offset;
return base;
} else {
return NULL;
}
}
- 迭代器next操作:
结合quicklistNext和quicklistGetIteratorAtIdx我们可以发现,当利用quicklistGetIteratorAtIdx初始化idx的迭代器时,此时itr->zi=NULL,只有进行了一次next后,itr->zi才会指向对应idx的entryNode。之后,对这个itr再执行quicklistNext操作,此时itr和entry都会指向下一个节点,而不再是NULL。
注意点:迭代器next操作时会进行decompress操作获得原始数据,因此稍后需要对这些数据进行重新压缩。
/* Get next element in iterator.
*
* Note: You must NOT insert into the list while iterating over it.
* You *may* delete from the list while iterating using the
* quicklistDelEntry() function.
* If you insert into the quicklist while iterating, you should
* re-create the iterator after your addition.
*
* iter = quicklistGetIterator(quicklist,<direction>);
* quicklistEntry entry;
* while (quicklistNext(iter, &entry)) {
* if (entry.value)
* [[ use entry.value with entry.sz ]]
* else
* [[ use entry.longval ]]
* }
*
* Populates 'entry' with values for this iteration.
* Returns 0 when iteration is complete or if iteration not possible.
* If return value is 0, the contents of 'entry' are not valid.
*/
int quicklistNext(quicklistIter *iter, quicklistEntry *entry) {
initEntry(entry);
if (!iter) {
D("Returning because no iter!");
return 0;
}
entry->quicklist = iter->quicklist;
entry->node = iter->current;
if (!iter->current) {
D("Returning because current node is NULL")
return 0;
}
unsigned char *(*nextFn)(unsigned char *, unsigned char *) = NULL;
int offset_update = 0;
if (!iter->zi) {
/* If !zi, use current index. */
quicklistDecompressNodeForUse(iter->current);
iter->zi = ziplistIndex(iter->current->zl, iter->offset);
} else {
/* else, use existing iterator offset and get prev/next as necessary. */
if (iter->direction == AL_START_HEAD) {
nextFn = ziplistNext;
offset_update = 1;
} else if (iter->direction == AL_START_TAIL) {
nextFn = ziplistPrev;
offset_update = -1;
}
iter->zi = nextFn(iter->current->zl, iter->zi);
iter->offset += offset_update;
}
entry->zi = iter->zi;
entry->offset = iter->offset;
if (iter->zi) {
/* Populate value from existing ziplist position */
ziplistGet(entry->zi, &entry->value, &entry->sz, &entry->longval);
return 1;
} else {
/* We ran out of ziplist entries.
* Pick next node, update offset, then re-run retrieval. */
quicklistCompress(iter->quicklist, iter->current);
if (iter->direction == AL_START_HEAD) {
/* Forward traversal */
D("Jumping to start of next node");
iter->current = iter->current->next;
iter->offset = 0;
} else if (iter->direction == AL_START_TAIL) {
/* Reverse traversal */
D("Jumping to end of previous node");
iter->current = iter->current->prev;
iter->offset = -1;
}
iter->zi = NULL;
return quicklistNext(iter, entry);
}
}
- 根据idx获取quicklist中的元素
/* Populate 'entry' with the element at the specified zero-based index
* where 0 is the head, 1 is the element next to head
* and so on. Negative integers are used in order to count
* from the tail, -1 is the last element, -2 the penultimate
* and so on. If the index is out of range 0 is returned.
*
* Returns 1 if element found
* Returns 0 if element not found */
int quicklistIndex(const quicklist *quicklist, const long long idx,
quicklistEntry *entry) {
quicklistNode *n;
unsigned long long accum = 0;
unsigned long long index;
int forward = idx < 0 ? 0 : 1; /* < 0 -> reverse, 0+ -> forward */
initEntry(entry);
entry->quicklist = quicklist;
if (!forward) {
index = (-idx) - 1;
n = quicklist->tail;
} else {
index = idx;
n = quicklist->head;
}
if (index >= quicklist->count)
return 0;
while (likely(n)) {
if ((accum + n->count) > index) {
break;
} else {
D("Skipping over (%p) %u at accum %lld", (void *)n, n->count,
accum);
accum += n->count;
n = forward ? n->next : n->prev;
}
}
if (!n)
return 0;
D("Found node: %p at accum %llu, idx %llu, sub+ %llu, sub- %llu", (void *)n,
accum, index, index - accum, (-index) - 1 + accum);
entry->node = n;
if (forward) {
/* forward = normal head-to-tail offset. */
entry->offset = index - accum;
} else {
/* reverse = need negative offset for tail-to-head, so undo
* the result of the original if (index < 0) above. */
entry->offset = (-index) - 1 + accum;
}
quicklistDecompressNodeForUse(entry->node);
entry->zi = ziplistIndex(entry->node->zl, entry->offset);
ziplistGet(entry->zi, &entry->value, &entry->sz, &entry->longval);
/* The caller will use our result, so we don't re-compress here.
* The caller can recompress or delete the node as needed. */
return 1;
}
quicklist查漏补缺:
- GCC关键字__builtin_expect
#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)
__builtin_expect() 是 GCC (version >= 2.96)提供给程序员使用的,目的是将“分支转移”的信息提供给编译器,这样编译器可以对代码进行优化,以减少指令跳转带来的性能下降。__builtin_expect((x),1) 表示 x 的值为真的可能性更大;
__builtin_expect((x),0) 表示 x 的值为假的可能性更大。
也就是说,使用 likely() ,执行 if 后面的语句 的机会更大,使用 unlikely(),执行 else 后面的语句的机会更大。通过这种方式,编译器在编译过程中,会将可能性更大的代码紧跟着起面的代码,从而减少指令跳转带来的性能上的下降。