Skiplist原理
skiplist
的效率可以和平衡树媲美,平均O(logN)
,最坏O(N)
的查询效率,但是用skiplist
实现比平衡树实现简单,所以很多程序用跳跃链表来代替平衡树。
内存屏障
内存屏障,也称内存栅栏,内存栅障,屏障指令等,是一类同步屏障指令,是CPU或编译器在对内存随机访问的操作中的一个同步点,使得此点之前的所有读写操作都执行后才可以开始执行此点之后的操作。更多细节
leveldb
支持多线程操作,但skiplist
中并没有使用锁,信号量来实现同步控制,因为锁机制导致某个线程占有资源,其他线程阻塞的情况,导致系统资源利用率降低。所以leveldb
采用的是内存屏障来实现同步机制。
class AtomicPointer {
private:
void* rep_;
public:
AtomicPointer() { }
explicit AtomicPointer(void* p) : rep_(p) {}
inline void* NoBarrier_Load() const { return rep_; }
inline void NoBarrier_Store(void* v) { rep_ = v; }
inline void* Acquire_Load() const {
void* result = rep_;
MemoryBarrier();//内存屏障
return result;
}
inline void Release_Store(void* v) {
MemoryBarrier();
rep_ = v;
}
};
关于内存屏障的实现,各个平台不同。。我就没有具体研究了,感兴趣的朋友可以去自己搜搜资料。
Node
template<typename Key, class Comparator>
struct SkipList<Key,Comparator>::Node {
explicit Node(const Key& k) : key(k) { }
Key const key;
// Accessors/mutators for links. Wrapped in methods so we can
// add the appropriate barriers as necessary.
Node* Next(int n) {
assert(n >= 0);
// Use an 'acquire load' so that we observe a fully initialized
// version of the returned Node.
return reinterpret_cast<Node*>(next_[n].Acquire_Load());
}
void SetNext(int n, Node* x) {
assert(n >= 0);
// Use a 'release store' so that anybody who reads through this
// pointer observes a fully initialized version of the inserted node.
next_[n].Release_Store(x);
}
// No-barrier variants that can be safely used in a few locations.
Node* NoBarrier_Next(int n) {
assert(n >= 0);
return reinterpret_cast<Node*>(next_[n].NoBarrier_Load());
}
void NoBarrier_SetNext(int n, Node* x) {
assert(n >= 0);
next_[n].NoBarrier_Store(x);
}
private:
// Array of length equal to the node height. next_[0] is lowest level link.
port::AtomicPointer next_[1]; // 占位用
};
刚开始看到port::AtomicPointer next_[1];
这句我整个人是懵逼的=。=,心想这在函数调用中不是特么越界么,后来发现大佬还是大佬,我还是too young too simple,看下面这个申请Node
的函数
template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node*
SkipList<Key,Comparator>::NewNode(const Key& key, int height) {
char* mem = arena_->AllocateAligned(
sizeof(Node) + sizeof(port::AtomicPointer) * (height - 1));
return new (mem) Node(key);
}
这样就可以理解了,实际上这是一个技巧性的作法,利用port::AtomicPointer next_[1]
来占位,然后在申请内存的时候,实际的数组大小和本节点的的height一致。
插入&&查询
leveldb
中为skiplist
自身仅实现了insert
和contain
两个公开的函数接口。
查询逻辑相当简单,直接上代码
template<typename Key, class Comparator>
bool SkipList<Key,Comparator>::Contains(const Key& key) const {
Node* x = FindGreaterOrEqual(key, NULL);
if (x != NULL && Equal(key, x->key)) {
return true;
} else {
return false;
}
}
leveldb
中skiplist
的核心操作当属insert
先上代码然后再具体分析
template<typename Key, class Comparator>
void SkipList<Key,Comparator>::Insert(const Key& key) {
Node* prev[kMaxHeight];
Node* x = FindGreaterOrEqual(key, prev);
// Our data structure does not allow duplicate insertion
assert(x == NULL || !Equal(key, x->key));
int height = RandomHeight();
if (height > GetMaxHeight()) {
for (int i = GetMaxHeight(); i < height; i++) {
prev[i] = head_;
}
max_height_.NoBarrier_Store(reinterpret_cast<void*>(height));
}
x = NewNode(key, height);
for (int i = 0; i < height; i++) {
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
prev[i]->SetNext(i, x);
}
}
思想是,在插入高度为height
的节点时,首先要找到这个节点height
个前节点,然后插入就和普通的链表插入一样。
max_height更新的两种情况:
1. 读到旧的max_height_,而后写线程更新了max_height_并正在进行或完成节点插入
2. 读到新的max_height_,而写线程正在进行或完成节点插入
对于上述两种情况,作者说明并不存在并发问题,为何呢?
不存在并发的关键在于最后的for
循环中,由最下层向上插入可以保证当前层一旦插入后,其下层状态已经更新。 SetNext为原子操作,保证读线程在调用Next查找节点时不存在并发问题。
当然,多个写之间的并发skiplist
时非线程安全的,在leveldb
的memtable
中采用了另外的技巧来处理写并发问题。
同样leveldb
中没有提供显式的删除节点操作,但实际上是可以删除的,因为当我们插入数据时,key
的形式为key:value
,当删除数据时,则插入key:deleted
类似删除的标记,等到Compaction
再删除。
leveldb
中还自己封装了一个双向迭代器,很简单的实现,不具体分析了。