




134 class InternalKey {
135  private:
136   std::string rep_;
137  public:
138   InternalKey() { }   // Leave rep_ as empty to indicate it is invalid
139   InternalKey(const Slice& user_key, SequenceNumber s, ValueType t) {
140     AppendInternalKey(&rep_, ParsedInternalKey(user_key, s, t));
141   }

 12 static uint64_t PackSequenceAndType(uint64_t seq, ValueType t) {
 13   assert(seq <= kMaxSequenceNumber);
 14   assert(t <= kValueTypeForSeek); 
 15   return (seq << 8) | t;
 16 }   
 18 void AppendInternalKey(std::string* result, const ParsedInternalKey& key) {
 19   result->append(key.user_key.data(), key.user_key.size());
 20   PutFixed64(result, PackSequenceAndType(key.sequence, key.type));
 21 }

 70 struct ParsedInternalKey {//分解出user_key+seq+type
 71   Slice user_key;
 72   SequenceNumber sequence;
 73   ValueType type;
 76   ParsedInternalKey(const Slice& u, const SequenceNumber& seq, ValueType t)
 77       : user_key(u), sequence(seq), type(t) { }
 79 };

178 // A helper class useful for DBImpl::Get()
179 class LookupKey {
180  public:
181   // Initialize *this for looking up user_key at a snapshot with
182   // the specified sequence number.
183   LookupKey(const Slice& user_key, SequenceNumber sequence);
185   ~LookupKey();
187   // Return a key suitable for lookup in a MemTable.
188   Slice memtable_key() const { return Slice(start_, end_ - start_); }
190   // Return an internal key (suitable for passing to an internal iterator)
191   Slice internal_key() const { return Slice(kstart_, end_ - kstart_); }
193   // Return the user key
194   Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 8); }
196  private:
204   const char* start_;
205   const char* kstart_;
206   const char* end_;
207   char space_[200];      // Avoid allocation for short keys
209   // No copying allowed
210   LookupKey(const LookupKey&);
211   void operator=(const LookupKey&);
212 };

121 LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) {
122   size_t usize = user_key.size();
123   size_t needed = usize + 13;  // A conservative estimate 
124   char* dst;
125   if (needed <= sizeof(space_)) {
126     dst = space_;
127   } else {
128     dst = new char[needed];
129   }
130   start_ = dst;
131   dst = EncodeVarint32(dst, usize + 8);
132   kstart_ = dst;
133   memcpy(dst, user_key.data(), usize);
134   dst += usize;
135   EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek));
136   dst += 8;
137   end_ = dst; 
138 } 



125   virtual void Put(const Slice& key, const Slice& value) {
126     mem_->Add(sequence_, kTypeValue, key, value);
127     sequence_++;
128   }

 82 void MemTable::Add(SequenceNumber s, ValueType type,
 83                    const Slice& key,
 84                    const Slice& value) {
 85   // Format of an entry is concatenation of:
 86   //  key_size     : varint32 of internal_key.size()
 87   //  key bytes    : char[internal_key.size()]
 88   //  value_size   : varint32 of value.size()
 89   //  value bytes  : char[value.size()]
 90   size_t key_size = key.size();
 91   size_t val_size = value.size();
 92   size_t internal_key_size = key_size + 8;//InternalKey的可变长度(key+seq+type)
 93   const size_t encoded_len =          
 94       VarintLength(internal_key_size) + internal_key_size +
 95       VarintLength(val_size) + val_size;//整条内容长度
 96   char* buf = arena_.Allocate(encoded_len);
 97   char* p = EncodeVarint32(buf, internal_key_size);//InternalKey的大小
 98   memcpy(p, key.data(), key_size);//key内容
 99   p += key_size;
100   EncodeFixed64(p, (s << 8) | type);//seq_number +type内容
101   p += 8;
102   p = EncodeVarint32(p, val_size);//value大小
103   memcpy(p, value.data(), val_size);//value内容
104   assert(p + val_size == buf + encoded_len);
105   table_.Insert(buf);
106 }



1487 Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
1488   WriteBatch batch;
1489   batch.Put(key, value);
1490   return Write(opt, &batch);
1491 }
1493 Status DB::Delete(const WriteOptions& opt, const Slice& key) {
1494   WriteBatch batch;
1495   batch.Delete(key);
1496   return Write(opt, &batch);
1497 }

 26 // WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
 27 static const size_t kHeader = 12;

 32 class LEVELDB_EXPORT WriteBatch {
 33  public:
 34   WriteBatch();
 73  private:
 74   friend class WriteBatchInternal;
 76   std::string rep_;  // See comment in write_batch.cc for the format of rep_
 77 };

102 void WriteBatch::Put(const Slice& key, const Slice& value) {
103   WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
104   rep_.push_back(static_cast<char>(kTypeValue));
105   PutLengthPrefixedSlice(&rep_, key);
106   PutLengthPrefixedSlice(&rep_, value);
107 }
109 void WriteBatch::Delete(const Slice& key) {//删除也是追加记录
110   WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
111   rep_.push_back(static_cast<char>(kTypeDeletion));
112   PutLengthPrefixedSlice(&rep_, key);
113 }
 46 Status WriteBatch::Iterate(Handler* handler) const {
 47   Slice input(rep_);
 48   if (input.size() < kHeader) {
 49     return Status::Corruption("malformed WriteBatch (too small)");
 50   }
 52   input.remove_prefix(kHeader);
 53   Slice key, value;
 54   int found = 0;
 55   while (!input.empty()) {
 56     found++;
 57     char tag = input[0];
 58     input.remove_prefix(1);
 59     switch (tag) {
 60       case kTypeValue://修改
 61         if (GetLengthPrefixedSlice(&input, &key) &&
 62             GetLengthPrefixedSlice(&input, &value)) {
 63           handler->Put(key, value);
 64         } else {
 65           return Status::Corruption("bad WriteBatch Put");
 66         }
 67         break;
 68       case kTypeDeletion://删除
 69         if (GetLengthPrefixedSlice(&input, &key)) {
 70           handler->Delete(key);
 71         } else {
 72           return Status::Corruption("bad WriteBatch Delete");
 73         }
 74         break;
 75       default:
 76         return Status::Corruption("unknown WriteBatch tag");
 77     }
 78   }

119 namespace {
120 class MemTableInserter : public WriteBatch::Handler {
121  public:
122   SequenceNumber sequence_;
123   MemTable* mem_;
125   virtual void Put(const Slice& key, const Slice& value) {
126     mem_->Add(sequence_, kTypeValue, key, value);
127     sequence_++;
128   }
129   virtual void Delete(const Slice& key) {
130     mem_->Add(sequence_, kTypeDeletion, key, Slice());
131     sequence_++;
132   }
133 };
134 }  // namespace

136 Status WriteBatchInternal::InsertInto(const WriteBatch* b,
137                                       MemTable* memtable) {
138   MemTableInserter inserter;
139   inserter.sequence_ = WriteBatchInternal::Sequence(b);
140   inserter.mem_ = memtable;
141   return b->Iterate(&inserter);
142 }



1204 Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
1205   Writer w(&mutex_);
1206   w.batch = my_batch;
1207   w.sync = options.sync;
1208   w.done = false;
1210   MutexLock l(&mutex_);
1211   writers_.push_back(&w);
1212   while (!w.done && &w != writers_.front()) {
1213     w.cv.Wait();
1214   }
1215   if (w.done) {
1216     return w.status;
1217   }
1218   //more code...
1258   while (true) {
1259     Writer* ready = writers_.front();
1260     writers_.pop_front();
1261     if (ready != &w) {
1262       ready->status = status;
1263       ready->done = true;
1264       ready->cv.Signal();
1265     }
1266     if (ready == last_writer) break;
1267   }
1269   // Notify new head of write queue
1270   if (!writers_.empty()) {
1271     writers_.front()->cv.Signal();
1272   }
1274   return status;
1275 }

以上是在多线程写的时候,会进入到队列中等待,由其他线程帮忙处理写请求,满足条件(w.done == false and writers_.front() == w)的写线程进行合并写请求;当处理完后会一个个设置Writer的w.done为真并唤醒,之后被唤醒的写线程由于done为真就返回,剩下的再继续写;

1204 Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
1219   // May temporarily unlock and wait.
1220   Status status = MakeRoomForWrite(my_batch == nullptr);
1221   uint64_t last_sequence = versions_->LastSequence();
1222   Writer* last_writer = &w;
1223   if (status.ok() && my_batch != nullptr) {  // nullptr batch is for compactions
1224     WriteBatch* updates = BuildBatchGroup(&last_writer);
1225     WriteBatchInternal::SetSequence(updates, last_sequence + 1);
1226     last_sequence += WriteBatchInternal::Count(updates);
1232     {
1233       mutex_.Unlock();
1234       status = log_->AddRecord(WriteBatchInternal::Contents(updates));
1235       bool sync_error = false;//写log
1236       if (status.ok() && options.sync) {
1237         status = logfile_->Sync();
1238         if (!status.ok()) {
1239           sync_error = true;
1240         }
1241       }
1242       if (status.ok()) {
1243         status = WriteBatchInternal::InsertInto(updates, mem_);
1244       }
1245       mutex_.Lock();
1246       if (sync_error) {
1250         RecordBackgroundError(status);
1251       }
1252     }
1253     if (updates == tmp_batch_) tmp_batch_->Clear();
1255     versions_->SetLastSequence(last_sequence);
1256   }


1279 WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
1280   mutex_.AssertHeld();
1281   assert(!writers_.empty());
1282   Writer* first = writers_.front();
1283   WriteBatch* result = first->batch;
1284   assert(result != nullptr);
1286   size_t size = WriteBatchInternal::ByteSize(first->batch);
1291   size_t max_size = 1 << 20;
1292   if (size <= (128<<10)) {//调整本次打包的大小
1293     max_size = size + (128<<10);
1294   }
1296   *last_writer = first;
1297   std::deque<Writer*>::iterator iter = writers_.begin();
1298   ++iter;  // Advance past "first"
1299   for (; iter != writers_.end(); ++iter) {//挨个合并
1300     Writer* w = *iter;
1301     if (w->sync && !first->sync) {//遇到同步退出
1302       // Do not include a sync write into a batch handled by a non-sync write.
1303       break;
1304     }
1306     if (w->batch != nullptr) {
1307       size += WriteBatchInternal::ByteSize(w->batch);
1308       if (size > max_size) {//超过设定大小
1309         // Do not make batch too big
1310         break;
1311       }
1313       // Append to *result
1314       if (result == first->batch) {
1315         // Switch to temporary batch instead of disturbing caller's batch
1316         result = tmp_batch_;
1317         assert(WriteBatchInternal::Count(result) == 0);
1318         WriteBatchInternal::Append(result, first->batch);
1319       }
1320       WriteBatchInternal::Append(result, w->batch);
1321     }
1322     *last_writer = w;
1323   }
1324   return result;
1325 }



RocksDB 写入流程详解
rocksdb源码分析 写优化之JoinBatchGroup
LevelDB Compaction 引发的 Data Inconsistency
LevelDB设计与实现 - 读写流程

  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,657评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,889评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,057评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,509评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,562评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,443评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,251评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,129评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,561评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,779评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,902评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,621评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,220评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,838评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,971评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,025评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,843评论 2 354
