虽然网上关于leveldb源码分析的文章挺多,但还是想写下来记录自己的再次学习过程,早几年前看过一些leveldb的实现因没有记录博客的文章,所以想再次分析leveldb源码时,看自己记录的东西会比较好些。后面再写几篇不准备更新咯。
本来是准备从头开始分析的,但一般对于数据库,读和写是常用的接口,从读写开始分析整个流程,包括启动时做的事情,宕机时数据库的恢复,读和写放大,以及可能的一致性等,对于性能优化这块,可能要参考连接中的资料,leveldb本身是个单机数据库,不包含网络这块,以及主备一致性等其他额外的功能。
通过leveldb,一方面是学习设计实现,比如根据list+hash实现lru算法,skiplist使用,多线程读写,布隆过滤器,以及关键的lsm模型等,当然也可以看到其中的bug及性能方面的问题,这些只有真正遇到踩坑时才会知道。
首先在开始前,先说明一下用到的key,分为InternalKey
和LookupKey
,部分代码如下:
134 class InternalKey {
135 private:
136 std::string rep_;
137 public:
138 InternalKey() { } // Leave rep_ as empty to indicate it is invalid
139 InternalKey(const Slice& user_key, SequenceNumber s, ValueType t) {
140 AppendInternalKey(&rep_, ParsedInternalKey(user_key, s, t));
141 }
12 static uint64_t PackSequenceAndType(uint64_t seq, ValueType t) {
13 assert(seq <= kMaxSequenceNumber);
14 assert(t <= kValueTypeForSeek);
15 return (seq << 8) | t;
16 }
17
18 void AppendInternalKey(std::string* result, const ParsedInternalKey& key) {
19 result->append(key.user_key.data(), key.user_key.size());
20 PutFixed64(result, PackSequenceAndType(key.sequence, key.type));
21 }
70 struct ParsedInternalKey {//分解出user_key+seq+type
71 Slice user_key;
72 SequenceNumber sequence;
73 ValueType type;
76 ParsedInternalKey(const Slice& u, const SequenceNumber& seq, ValueType t)
77 : user_key(u), sequence(seq), type(t) { }
79 };
178 // A helper class useful for DBImpl::Get()
179 class LookupKey {
180 public:
181 // Initialize *this for looking up user_key at a snapshot with
182 // the specified sequence number.
183 LookupKey(const Slice& user_key, SequenceNumber sequence);
184
185 ~LookupKey();
186
187 // Return a key suitable for lookup in a MemTable.
188 Slice memtable_key() const { return Slice(start_, end_ - start_); }
189
190 // Return an internal key (suitable for passing to an internal iterator)
191 Slice internal_key() const { return Slice(kstart_, end_ - kstart_); }
192
193 // Return the user key
194 Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 8); }
195
196 private:
204 const char* start_;
205 const char* kstart_;
206 const char* end_;
207 char space_[200]; // Avoid allocation for short keys
208
209 // No copying allowed
210 LookupKey(const LookupKey&);
211 void operator=(const LookupKey&);
212 };
121 LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) {
122 size_t usize = user_key.size();
123 size_t needed = usize + 13; // A conservative estimate
124 char* dst;
125 if (needed <= sizeof(space_)) {
126 dst = space_;
127 } else {
128 dst = new char[needed];
129 }
130 start_ = dst;
131 dst = EncodeVarint32(dst, usize + 8);
132 kstart_ = dst;
133 memcpy(dst, user_key.data(), usize);
134 dst += usize;
135 EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek));
136 dst += 8;
137 end_ = dst;
138 }
InternalKey
由user_key+7字节的seq_number+1字节的type
组成;这样做是有一定的好处的,会在后面说明。
以上的实现也比较简单,其中type
只有两种类型,kTypeDeletion
和kTypeValue
,LookupKey
构造时,解码成key的大小+key的内容,一个反向过程,由write时编码:
125 virtual void Put(const Slice& key, const Slice& value) {
126 mem_->Add(sequence_, kTypeValue, key, value);
127 sequence_++;
128 }
82 void MemTable::Add(SequenceNumber s, ValueType type,
83 const Slice& key,
84 const Slice& value) {
85 // Format of an entry is concatenation of:
86 // key_size : varint32 of internal_key.size()
87 // key bytes : char[internal_key.size()]
88 // value_size : varint32 of value.size()
89 // value bytes : char[value.size()]
90 size_t key_size = key.size();
91 size_t val_size = value.size();
92 size_t internal_key_size = key_size + 8;//InternalKey的可变长度(key+seq+type)
93 const size_t encoded_len =
94 VarintLength(internal_key_size) + internal_key_size +
95 VarintLength(val_size) + val_size;//整条内容长度
96 char* buf = arena_.Allocate(encoded_len);
97 char* p = EncodeVarint32(buf, internal_key_size);//InternalKey的大小
98 memcpy(p, key.data(), key_size);//key内容
99 p += key_size;
100 EncodeFixed64(p, (s << 8) | type);//seq_number +type内容
101 p += 8;
102 p = EncodeVarint32(p, val_size);//value大小
103 memcpy(p, value.data(), val_size);//value内容
104 assert(p + val_size == buf + encoded_len);
105 table_.Insert(buf);
106 }
以上是把某条记录加到skiplist中,这里不管是删除还是修改增加,统一为插入新的记录,后期会进行合并保留最新的。
下面是在写入时的合并类,部分代码:
1487 Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
1488 WriteBatch batch;
1489 batch.Put(key, value);
1490 return Write(opt, &batch);
1491 }
1492
1493 Status DB::Delete(const WriteOptions& opt, const Slice& key) {
1494 WriteBatch batch;
1495 batch.Delete(key);
1496 return Write(opt, &batch);
1497 }
26 // WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
27 static const size_t kHeader = 12;
32 class LEVELDB_EXPORT WriteBatch {
33 public:
34 WriteBatch();
73 private:
74 friend class WriteBatchInternal;
75
76 std::string rep_; // See comment in write_batch.cc for the format of rep_
77 };
102 void WriteBatch::Put(const Slice& key, const Slice& value) {
103 WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
104 rep_.push_back(static_cast<char>(kTypeValue));
105 PutLengthPrefixedSlice(&rep_, key);
106 PutLengthPrefixedSlice(&rep_, value);
107 }
108
109 void WriteBatch::Delete(const Slice& key) {//删除也是追加记录
110 WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
111 rep_.push_back(static_cast<char>(kTypeDeletion));
112 PutLengthPrefixedSlice(&rep_, key);
113 }
46 Status WriteBatch::Iterate(Handler* handler) const {
47 Slice input(rep_);
48 if (input.size() < kHeader) {
49 return Status::Corruption("malformed WriteBatch (too small)");
50 }
51
52 input.remove_prefix(kHeader);
53 Slice key, value;
54 int found = 0;
55 while (!input.empty()) {
56 found++;
57 char tag = input[0];
58 input.remove_prefix(1);
59 switch (tag) {
60 case kTypeValue://修改
61 if (GetLengthPrefixedSlice(&input, &key) &&
62 GetLengthPrefixedSlice(&input, &value)) {
63 handler->Put(key, value);
64 } else {
65 return Status::Corruption("bad WriteBatch Put");
66 }
67 break;
68 case kTypeDeletion://删除
69 if (GetLengthPrefixedSlice(&input, &key)) {
70 handler->Delete(key);
71 } else {
72 return Status::Corruption("bad WriteBatch Delete");
73 }
74 break;
75 default:
76 return Status::Corruption("unknown WriteBatch tag");
77 }
78 }
119 namespace {
120 class MemTableInserter : public WriteBatch::Handler {
121 public:
122 SequenceNumber sequence_;
123 MemTable* mem_;
124
125 virtual void Put(const Slice& key, const Slice& value) {
126 mem_->Add(sequence_, kTypeValue, key, value);
127 sequence_++;
128 }
129 virtual void Delete(const Slice& key) {
130 mem_->Add(sequence_, kTypeDeletion, key, Slice());
131 sequence_++;
132 }
133 };
134 } // namespace
136 Status WriteBatchInternal::InsertInto(const WriteBatch* b,
137 MemTable* memtable) {
138 MemTableInserter inserter;
139 inserter.sequence_ = WriteBatchInternal::Sequence(b);
140 inserter.mem_ = memtable;
141 return b->Iterate(&inserter);
142 }
根据以上实现,可知WriteBatch
中rep_
的格式:seq_number+count+record(type+keysize+keyvaule+valuesize+value)+....+record
如果是删除的话则不会value相关的。
下面是写的接口,为了简单化起见,不考虑中间的合并等情况,后面会慢慢分析,单纯的写数据:
1204 Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
1205 Writer w(&mutex_);
1206 w.batch = my_batch;
1207 w.sync = options.sync;
1208 w.done = false;
1209
1210 MutexLock l(&mutex_);
1211 writers_.push_back(&w);
1212 while (!w.done && &w != writers_.front()) {
1213 w.cv.Wait();
1214 }
1215 if (w.done) {
1216 return w.status;
1217 }
1218 //more code...
1258 while (true) {
1259 Writer* ready = writers_.front();
1260 writers_.pop_front();
1261 if (ready != &w) {
1262 ready->status = status;
1263 ready->done = true;
1264 ready->cv.Signal();
1265 }
1266 if (ready == last_writer) break;
1267 }
1268
1269 // Notify new head of write queue
1270 if (!writers_.empty()) {
1271 writers_.front()->cv.Signal();
1272 }
1273
1274 return status;
1275 }
以上是在多线程写的时候,会进入到队列中等待,由其他线程帮忙处理写请求,满足条件(w.done == false and writers_.front() == w
)的写线程进行合并写请求;当处理完后会一个个设置Writer的w.done为真并唤醒,之后被唤醒的写线程由于done为真就返回,剩下的再继续写;
1204 Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
1219 // May temporarily unlock and wait.
1220 Status status = MakeRoomForWrite(my_batch == nullptr);
1221 uint64_t last_sequence = versions_->LastSequence();
1222 Writer* last_writer = &w;
1223 if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions
1224 WriteBatch* updates = BuildBatchGroup(&last_writer);
1225 WriteBatchInternal::SetSequence(updates, last_sequence + 1);
1226 last_sequence += WriteBatchInternal::Count(updates);
1232 {
1233 mutex_.Unlock();
1234 status = log_->AddRecord(WriteBatchInternal::Contents(updates));
1235 bool sync_error = false;//写log
1236 if (status.ok() && options.sync) {
1237 status = logfile_->Sync();
1238 if (!status.ok()) {
1239 sync_error = true;
1240 }
1241 }
1242 if (status.ok()) {
1243 status = WriteBatchInternal::InsertInto(updates, mem_);
1244 }
1245 mutex_.Lock();
1246 if (sync_error) {
1250 RecordBackgroundError(status);
1251 }
1252 }
1253 if (updates == tmp_batch_) tmp_batch_->Clear();
1254
1255 versions_->SetLastSequence(last_sequence);
1256 }
这里暂时不分析MakeRoomForWrite
的实现;其中BuildBatchGroup
会打包写请求:
1279 WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
1280 mutex_.AssertHeld();
1281 assert(!writers_.empty());
1282 Writer* first = writers_.front();
1283 WriteBatch* result = first->batch;
1284 assert(result != nullptr);
1286 size_t size = WriteBatchInternal::ByteSize(first->batch);
1291 size_t max_size = 1 << 20;
1292 if (size <= (128<<10)) {//调整本次打包的大小
1293 max_size = size + (128<<10);
1294 }
1296 *last_writer = first;
1297 std::deque<Writer*>::iterator iter = writers_.begin();
1298 ++iter; // Advance past "first"
1299 for (; iter != writers_.end(); ++iter) {//挨个合并
1300 Writer* w = *iter;
1301 if (w->sync && !first->sync) {//遇到同步退出
1302 // Do not include a sync write into a batch handled by a non-sync write.
1303 break;
1304 }
1305
1306 if (w->batch != nullptr) {
1307 size += WriteBatchInternal::ByteSize(w->batch);
1308 if (size > max_size) {//超过设定大小
1309 // Do not make batch too big
1310 break;
1311 }
1312
1313 // Append to *result
1314 if (result == first->batch) {
1315 // Switch to temporary batch instead of disturbing caller's batch
1316 result = tmp_batch_;
1317 assert(WriteBatchInternal::Count(result) == 0);
1318 WriteBatchInternal::Append(result, first->batch);
1319 }
1320 WriteBatchInternal::Append(result, w->batch);
1321 }
1322 *last_writer = w;
1323 }
1324 return result;
1325 }
通过以上代码实现,多线程写时,通过锁和条件变量,把自己push到队列并阻塞,由其他队头的写进行可能的合并打包,然后批量先写日志,具体格式后面再分析或参考网上资料,写完日志后,再把记录插入到skiplist后,再更新seq,再依次唤醒队列中的写线程。
这里的写在rocksdb中进行了优化,可以参考下面的链接。
参考资料
基于RocksDB做的key-value分离,解决value写放大严重的问题
key-value分离论文
RocksDB 写入流程详解
rocksdb源码分析 写优化之JoinBatchGroup
LevelDB Compaction 引发的 Data Inconsistency
Leveldb源码分析--3
LevelDB设计与实现 - 读写流程