初始化
MVCC初始化的时候会建立一个 kvindex
s := &store{
cfg: cfg,
b: b,
kvindex: newTreeIndex(lg),
le: le,
currentRev: 1,
compactMainRev: -1,
fifoSched: schedule.NewFIFOScheduler(lg),
stopc: make(chan struct{}),
lg: lg,
}
这个 kvindex 实际上是一个 32 阶的 btree ,每个节点最多 32 个子节点,同时指定了 B-tree 的比较器,用于排序 key,这里更小的返回 true ,即左插法
func newTreeIndex(lg *zap.Logger) index {
return &treeIndex{
tree: btree.NewG(32, func(aki *keyIndex, bki *keyIndex) bool {
return aki.Less(bki)
}),
lg: lg,
}
}
这里的 btree 可以理解为 b+ 树,一种内存中的平衡多路搜索树,在结构上接近 b+ 树,但不是传统数据库中定义的严格 b+ 树:数据存储主要存放在叶子节点,但是没有指针链表链接叶子节点
初始化时同时还会往底层 boltdb 中写入一些 Bucket 信息,包括 key、meta
这两个 Bucket
tx := s.b.BatchTx()
tx.LockOutsideApply()
// 创建 "key" Bucket
tx.UnsafeCreateBucket(schema.Key)
// 创建 "meta" Bucket
schema.UnsafeCreateMetaBucket(tx)
tx.Unlock()
s.b.ForceCommit()
插入
MVCC中写入数据时,会先开启一个写事务
func (s *store) Write(trace *traceutil.Trace) TxnWrite {
s.mu.RLock()
tx := s.b.BatchTx()
tx.LockInsideApply()
tw := &storeTxnWrite{
storeTxnCommon: storeTxnCommon{s, tx, 0, 0, trace},
tx: tx,
beginRev: s.currentRev,
changes: make([]mvccpb.KeyValue, 0, 4),
}
return newMetricsTxnWrite(tw)
}
需要注意的一些点:
对
s.mu.RLock()
加只读锁,仅用于并发读取当前的 currentRev,因为还不涉及写操作,所以这里只需要加只读锁获取 boltdb 中的 batch 写事务并加独占锁;
使用 changes 来记录事务内修改的 kv 列表,容量初始为 4
最后,这里返回的是一个包装了指标数据的写事务,会通过指标记录操作的次数
etcd_mvcc_txn_total:记录事务操作的次数,range、put、delete等操作都算事务操作
etcd_mvcc_range_total:range 操作的次数
etcd_mvcc_put_total:put 操作的次数
etcd_debugging_mvcc_total_put_size_in_bytes:put 操作提交的 KV 总占用字节数
etcd_mvcc_delete_total:delete 操作的次数
首次写入
写入数据的时候,会先尝试从 btree 中查询这个 key,首次写入时肯定无法找到这个 key
_, created, ver, err := tw.s.kvindex.Get(key, rev)
然后会使用 currentRev + 1 来作为本次写事务的初始版本号; 通过 currentRev + 1 来作为 Main、changes的长度来做为 Sub,共同构成 Revision;这个 Revision 会作为底层 boltdb 的索引 key
rev := tw.beginRev + 1
ibytes := NewRevBytes()
idxRev := Revision{Main: rev, Sub: int64(len(tw.changes))}
ibytes = RevToBytes(idxRev, ibytes)
然后再构建底层 boltdb 中的value,会记录 Key 、value、创建的Revision号、上一次修改的Revision号、版本(从1开始递增)、租约
kv := mvccpb.KeyValue{
Key: key,
Value: value,
CreateRevision: c,
ModRevision: rev,
Version: ver,
Lease: int64(leaseID),
}
d, err := kv.Marshal()
写入底层 boltdb,写入的是 "key" 这个 Bucket;索引就是上面的 Revision ,包括 Main 和 Sub;value 就是序列化之后的 kv 信息
tw.tx.UnsafeSeqPut(schema.Key, ibytes, d)
最后再写入内存MVCC,实际就是写入到 btree 中,写入的 key 才是用户提交的 key , value 是Bucket中的索引 Revision ,包括 Main 和 Sub
tw.s.kvindex.Put(key, idxRev)
考虑到每个 kv 都需要记录其变更的历史版本,在写入内存MVCC中的时候,除了会记录这个用户 key 的当前 Revision ,还会保留它的历史版本
func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {
rev := Revision{Main: main, Sub: sub}
if len(ki.generations) == 0 {
ki.generations = append(ki.generations, generation{})
}
g := &ki.generations[len(ki.generations)-1]
if len(g.revs) == 0 { // create a new key
keysGauge.Inc()
g.created = rev
}
g.revs = append(g.revs, rev)
g.ver++
ki.modified = rev
}
每个 Main + Sub 才会作为一个 Revision{Main: main, Sub: sub},即一个版本
通过 generation 中的 revs 数组来记录历史版本,如果不存在 revs ,说明没有历史版本,此时会记录这个用户 key 的创建版本,即当前版本
通过 generation 的 ver 来记录版本数量,同时将当前版本 Revision{Main: main, Sub: sub} 做为最后修改的一个版本
多次写入
对同一个 key 多次写入时,流程也和上述单次写入基本一致,正如上面所说,在写入之前都会先检查 btree 中是否存在这个 key
_, created, ver, err := tw.s.kvindex.Get(key, rev)
多次写入时,就能正常查询到这个 key 了;具体的会从后往前先找到一个 generation,这个 generation 的第一个版本 Main 小于当前写事务的版本号
rev := tw.beginRev + 1
g.revs[0].Main <= rev
然后再从后往前的遍历这个 generation 的所有版本,找到第一个不超过当前写事务版本号的第一个版本作为当前 key的创建版本;
n := g.walk(func(rev Revision) bool { return rev.Main > atRev })
c = created.Main
其它流程就完全一致了,写入到底层 boltdb 中的 value 是最新的 value;而写入到 btree 中的时候会将当前版本追加到 generation 中的 revs 数组中
完成
插入操作需要通过 End 方法来结束当前的事务操作
对于 MetricsTxnWrite 写事务来说,它的 End 方法就是推送上述提到的指标
而对于 StoreTxnWrite 写事务来说,他会在 End 方法中更新 currentRev,即更新全局版本号,通过 revMu 这个锁来控制;释放 boltdb 中的 batch 写事务的锁;同时释放 tw.s.mu.RUnlock()
只读锁,对应的就是开始写事务时加的只读锁
func (tw *storeTxnWrite) End() {
// only update index if the txn modifies the mvcc state.
if len(tw.changes) != 0 {
// hold revMu lock to prevent new read txns from opening until writeback.
tw.s.revMu.Lock()
tw.s.currentRev++
}
tw.tx.Unlock()
if len(tw.changes) != 0 {
tw.s.revMu.Unlock()
}
tw.s.mu.RUnlock()
}
读取
由于 MVCC 中保存的是用户 key 和 Revision 之间的映射关系,需要获取到 value ,就必须读 boltdb
计数查询
但是如果只是计数 count 的话,就只需要读 MVCC 就可以了
如果没有指定 end key ,那么就只需要从 btree 中查询开始 key 就可以了;查询逻辑与上述多次写入时的查询一致,即先从后往前先找到一个 generation,这个 generation 的第一个版本 Main 小于当前全局 currentRev ;然后再从后往前的遍历这个 generation 的所有版本,找到第一个不超过当前全局 currentRev 的第一个版本即认为找到了,返回个数1;否则返回个数 0
if end == nil {
_, _, _, err := ti.unsafeGet(key, atRev)
if err != nil {
return 0
}
return 1
}
如果指定了 end key ;那么就会遍历 btree ,直到 btree 中的 key 大于等于 end key 停止;一种特殊情况就是 end key 是空数据的时候,会一直遍历到 btree 的最后,这通常对应的是查询时候指定了--from-key
等条件;
total := 0
ti.unsafeVisit(key, end, func(ki *keyIndex) bool {
if _, _, _, err := ki.get(ti.lg, atRev); err == nil {
total++
}
return true
})
ti.tree.AscendGreaterOrEqual(keyi, func(item *keyIndex) bool {
if len(endi.key) > 0 && !item.Less(endi) {
return false
}
if !f(item) {
return false
}
return true
})
KV查询
但如果是KV查询,也是先按照计数查询相同的逻辑找到满足条件的那些版本,不只是计数了,需要返回对应的版本
然后会依次对这些版本进行遍历去查询 boltdb 底层数据
for i, revpair := range revpairs[:len(kvs)] {
对于每一个版本,都需要先恢复出 boltdb 中的索引,即 Revision{Main: main, Sub: sub}
func RevToBytes(rev Revision, bytes []byte) []byte {
return BucketKeyToBytes(newBucketKey(rev.Main, rev.Sub, false), bytes)
}
然后根据这个索引去查询 boltdb 中的 "key" 这个 Bucket
_, vs := tr.tx.UnsafeRange(schema.Key, revBytes, nil, 0)
在查询底层 boltdb 时,会做缓存优化,会优先读取 buckets 缓存中是否已经加载过该 Bucket,如果没有加载过该 Bucket,会从底层将这个 Bucket 加载到内存缓存起来,这样上面循环下个 Revision{Main: main, Sub: sub} 时就可以直接从缓存中加载这个 Bucket
bn := bucketType.ID()
baseReadTx.txMu.RLock()
bucket, ok := baseReadTx.buckets[bn]
baseReadTx.txMu.RUnlock()
lockHeld := false
if !ok {
baseReadTx.txMu.Lock()
lockHeld = true
bucket = baseReadTx.tx.Bucket(bucketType.Name())
baseReadTx.buckets[bn] = bucket
}
最后再通过 Bucket的游标来查询 Revision{Main: main, Sub: sub} 这个索引对应的 value
c := bucket.Cursor()
baseReadTx.txMu.Unlock()
k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
删除
删除操作也支持 range ,因此删除前会先把需要删除的 key range 出来;range 的逻辑和上述 插入 和 读取过程是完全一样的
rrev := tw.beginRev
keys, _ := tw.s.kvindex.Range(key, end, rrev)
然后再依次删除这些 keys
for _, key := range keys {
tw.delete(key)
}
每一个 key 的删除,也是先恢复出 Revision;这里的 Main 取得是全局 currentRev+1 ,意味着 key的删除版本,Sub 都是取得 0 ;还有一个需要注意的标志位是 tombstone =true,这个标志位用于标识这个 key 已经被删除,意味墓碑的意思
idxRev := newBucketKey(tw.beginRev+1, int64(len(tw.changes)), true)
然后重新构建 KV 进行插入,可以看到这里 KV 的 value 是空的;同时删除操作也是通过插入来实现的,这里直接把 value 为空更新到 "key" 这个 Bucket 中去
kv := mvccpb.KeyValue{Key: key}
tw.tx.UnsafeSeqPut(schema.Key, ibytes, d)
然后再更新MVCC中的索引,更新索引的时候会先把 Revision{Main: currentRev+1, Sub: 0} 插入到现有的 generation 中去,然后再给这个用户 key 追加一个空的 generation;最后还包括指标的记录来减少etcd_debugging_mvcc_keys_total
err = tw.s.kvindex.Tombstone(key, idxRev.Revision)
func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {
ki.put(lg, main, sub)
ki.generations = append(ki.generations, generation{})
keysGauge.Dec()
return nil
}
这里追加一个空 generation 是查询不到的关键;再上述的 range 逻辑中,会从后往前先找到一个 generation,可以看到这里会跳过版本 revs 为空的 generation,然后在处理非最后一个 generation 并且该 generation 的所有版本都不高于全局版本号时,会直接返回空 generation
func (ki *keyIndex) findGeneration(rev int64) *generation {
lastg := len(ki.generations) - 1
cg := lastg
for cg >= 0 {
if len(ki.generations[cg].revs) == 0 {
cg--
continue
}
g := ki.generations[cg]
if cg != lastg {
if tomb := g.revs[len(g.revs)-1].Main; tomb <= rev {
return nil
}
}
if g.revs[0].Main <= rev {
return &ki.generations[cg]
}
cg--
}
return nil
}
完成
删除操作的结束流程也和插入操作的一样,也会通过 End 来使全局版本号自增
boltdb 操作
上述插入和删除操作都涉及底层 boltdb 的写入操作,这些写入操作都是通过 boltdb 事务来实现的,每次更新数据只会增加 pending 数量
func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq bool) {
bucket := t.tx.Bucket(bucketType.Name())
if err := bucket.Put(key, value); err != nil {
}
t.pending++
}
事务的提交是异步定时执行的,默认是每 100ms 提交一次事务,也就是这 100ms 内的操作会在一个事务里提交
func (b *backend) run() {
defer close(b.donec)
t := time.NewTimer(b.batchInterval)
defer t.Stop()
for {
select {
case <-t.C:
case <-b.stopc:
b.batchTx.CommitAndStop()
return
}
if b.batchTx.pending != 0 {
b.batchTx.Commit()
}
t.Reset(b.batchInterval)
}
}
事务提交之后,会再次开启新的事务
if !stop {
t.tx = t.backend.begin(true)
}
总结
MVCC是 KV 的内存存储模型
MVCC 本质上是一个 btree ,建立的是用户 key 到 Revision{Main: main, Sub: sub} 之间的索引关系
底层数据都存储在 boltdb 中的 "key" 这个 Bucket中,建立的是 Revision{Main: main, Sub: sub} 到用户 value 之间的索引关系
MVCC 中通过 generation 数组 和 generation 中的 revs 数组来记录历史版本;generation 数组会在用户 key 被删除之后增加,generation 中的 revs 数组用于记录每个 key 的历史修改版本
删除 key 不会真的删除,在底层 botdb 中会把 key 的value置空,内存 MVCC 中通过追加空 generation 来让这个 key 成为 tombstone ,在查询时就查不到这个 key 了
插入和删除操作都会使全局版本号自增