5.1 基本原理
SkipList称之为跳表,可实现Log(n)级别的插入、删除。跳表是平衡树的一种替代方案,和平衡树不同的是,跳表并不保证严格的“平衡性”,而是采用更为随性的方法:随机平衡算法。
关于SkipList的完整介绍请参见跳表(SkipList),这里借用几幅图做简要说明:
- 图1.1中红色部分为初始化状态,即head各个level中next节点均为NULL。
- 跳表是分层的,由下往上分别为1、2、3...,因此需要分层算法。
- 跳表中每一层的数据都是按顺序存储的,因此需要Compactor。
- 查找动作由最上层开始依序查找,直到找到数据或查找失败。
- 插入动作仅影响插入位置前后节点,对其他节点无影响。
- 删除动作仅影响插入位置前后节点,对其他节点无影响。
5.2 分层算法
分层算法决定了数据插入的Level,SkipList的平衡性如何全权由分层算法决定。极端情况下,假设SkipList只有Level-0层,SkipList将弱化成自排序List。此时查找、插入、删除的时间复杂度均为O(n),而非O(Log(n))。
LevelDB中的分层算法实现如下(leveldb::skiplist::RandomHeight())
// enum { kMaxHeight = 12 };
template<typename Key, class Comparator>
int SkipList<Key, Comparator>::RandomHeight()
{
// Increase height with probability 1 in kBranching
static const unsigned int kBranching = 4;
int height = 1;
while (height < kMaxHeight && ((rnd_.Next() % kBranching) == 0)) {
height++;
}
assert(height > 0);
assert(height <= kMaxHeight);
return height;
}
kMaxHeight 代表Skiplist的最大高度,即最多允许存在多少层,为关键参数,与性能直接相关。修改kMaxHeight ,在数值变小时,性能上有明显下降,但当数值增大时,甚至增大到10000时,和默认的kMaxHeight =12相比仍旧无明显差异,内存使用上也是如此。
为何如此?关键在于while循环中的判定条件:height < kMaxHeight && ((rnd_.Next() % kBranching) == 0)。除了kMaxHeight 判定外,(rnd_.Next() % kBranching) == 0)判定使得上层节点的数量约为下层的1/4。那么,当设定MaxHeight=12时,根节点为1时,约可均匀容纳Key的数量为4^11=4194304(约为400W)。因此,当单独增大MaxHeight时,并不会使得SkipList的层级提升。MaxHeight=12为经验值,在百万数据规模时,尤为适用。
5.3 比较器(Compactor)
如同二叉树,Skiplist也是有序的,键值比较需由比较器(Compactor)完成。SkipList对Compactor的要求只有一点:()操作符重载,格式如下:
//a<b返回值小于0,a>b返回值大于0,a==b返回值为0
int operator()(const Key& a, const Key& b) const;
SkipList定义中,Key与Compactor均为模板参数,因而Compactor亦由使用者实现。
template <typename Key, class Comparator>
class SkipList
{
....
}
LevelDB中存在一个Compactor抽象类,但该抽象类并没有重载()操作符,至于Compacotr如何使用及Compactor抽象类和此处的Compactor的关系如何请参见MemTable一节。
5.4 查找、插入、删除
LevelDB中实现的SkipList并无删除行为,这由其业务特性决定,故此处不提。
查找、插入亦即读、写行为。由图1.2可知,插入首先需完成一次查找动作,随后在指定位置上完成一次插入行为。
5.4.1 查找
LevelDB中的查找、插入行为几乎做到了“无锁”并发,这一点是非常可取的。关于这一点,是本次备忘的重点。先来看查找:
template<typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindGreaterOrEqual(const Key& key, Node** prev) const
{
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
Node* next = x->Next(level);
if (KeyIsAfterNode(key, next)) {
// Keep searching in this list
x = next;
}
else {
if (prev != NULL) prev[level] = x;
if (level == 0) {
return next;
}
else {
// Switch to next list
level--;
}
}
}
}
实现并无特别之处:由最上层开始查找,一直查找到Level-0。找到大于等于指定Key值的数据,如不存在返回NULL。来看SkipList的Node结构:
template<typename Key, class Comparator>
struct SkipList<Key, Comparator>::Node {
explicit Node(const Key& k) : key(k) { }
Key const key;
// Accessors/mutators for links. Wrapped in methods so we can
// add the appropriate barriers as necessary.
Node* Next(int n) {
assert(n >= 0);
// Use an 'acquire load' so that we observe a fully initialized
// version of the returned Node.
return reinterpret_cast<Node*>(next_[n].Acquire_Load());
}
void SetNext(int n, Node* x) {
assert(n >= 0);
// Use a 'release store' so that anybody who reads through this
// pointer observes a fully initialized version of the inserted node.
next_[n].Release_Store(x);
}
// No-barrier variants that can be safely used in a few locations.
Node* NoBarrier_Next(int n) {
assert(n >= 0);
return reinterpret_cast<Node*>(next_[n].NoBarrier_Load());
}
void NoBarrier_SetNext(int n, Node* x) {
assert(n >= 0);
next_[n].NoBarrier_Store(x);
}
private:
// Array of length equal to the node height. next_[0] is lowest level link.
port::AtomicPointer next_[1]; //看NewNode代码,实际大小为node height
};
Node有两个成员变量,Key及next_数组。Key当然是节点数据,next_数组(注意其类型为AtomicPointer )则指向了其所在层及之下各个层中的下一个节点(参见图1.1)。Next_数组的实际大小和该节点的height一致,来看Node的工厂方法NewNode:
template<typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::NewNode(const Key& key, int height)
{
//此处分别的为对象体,需要边界对齐
char* mem = arena_->AllocateAligned( sizeof(Node) +
sizeof(port::AtomicPointer) * (height - 1));
//c++ placement显示调用构造函数,并不常见。
return new (mem) Node(key);
}
Node的两组方法:SetNext/Next、NoBarrier_SetNext/NoBarrier_Next,用于读写指定层的下一节点指针,前者并发安全、后者非并发安全。
5.4.2 插入
template<typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key)
{
// TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
// here since Insert() is externally synchronized.
Node* prev[kMaxHeight];
Node* x = FindGreaterOrEqual(key, prev);
// Our data structure does not allow duplicate insertion
assert(x == NULL || !Equal(key, x->key));
int height = RandomHeight();
if (height > GetMaxHeight())
{
for (int i = GetMaxHeight(); i < height; i++) {
prev[i] = head_;
}
//fprintf(stderr, "Change height from %d to %d\n", max_height_, height);
// It is ok to mutate max_height_ without any synchronization
// with concurrent readers. A concurrent reader that observes
// the new value of max_height_ will see either the old value of
// new level pointers from head_ (NULL), or a new value set in
// the loop below. In the former case the reader will
// immediately drop to the next level since NULL sorts after all
// keys. In the latter case the reader will use the new node.
max_height_.NoBarrier_Store(reinterpret_cast<void*>(height));
}
x = NewNode(key, height);
for (int i = 0; i < height; i++) {
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
prev[i]->SetNext(i, x);
}
}
插入行为主要修改两类数据:max_height_及所有level中前一节点的next指针。
max_height_没有任何并发保护,关于此处作者注释讲的很清楚:
读线程在读到新的max_height_同时,对应的层级指针(new level pointer from head_)可能是原有的NULL,也有可能是部分更新的层级指针。如果是前者将直接跳到下一level继续查找,如果是后者,新插入的节点将被启用。
随后节点插入方是将无锁并发变为现实:
- 首先更新插入节点的next指针,此处无并发问题。
- 修改插入位置前一节点的next指针,此处采用SetNext处理并发。
- 由最下层向上插入可以保证当前层一旦插入后,其下层已更新完毕并可用。
当然,多个写之间的并发SkipList时非线程安全的,在LevelDB的MemTable中采用了另外的技巧来处理写并发问题。
5.5 总结
SkipList的功能定位和B-Tree类似,但实现更简单。LevelDB选用SkipList一来和SkipList的高效、简洁相关,二来SkipList也仅存在MemTable一种简单应用场景。下一节,让我们聊聊MemTable。
转载请注明:【随安居士】http://www.jianshu.com/p/6624befde844