ETCD《五》--内存MVCC

初始化

MVCC初始化的时候会建立一个 kvindex

s := &store{
        cfg:     cfg,
        b:       b,
        kvindex: newTreeIndex(lg),

        le: le,

        currentRev:     1,
        compactMainRev: -1,

        fifoSched: schedule.NewFIFOScheduler(lg),

        stopc: make(chan struct{}),

        lg: lg,
    }

这个 kvindex 实际上是一个 32 阶的 btree ,每个节点最多 32 个子节点,同时指定了 B-tree 的比较器,用于排序 key,这里更小的返回 true ,即左插法

func newTreeIndex(lg *zap.Logger) index {
    return &treeIndex{
        tree: btree.NewG(32, func(aki *keyIndex, bki *keyIndex) bool {
            return aki.Less(bki)
        }),
        lg: lg,
    }
}

这里的 btree 可以理解为 b+ 树,一种内存中的平衡多路搜索树,在结构上接近 b+ 树,但不是传统数据库中定义的严格 b+ 树:数据存储主要存放在叶子节点,但是没有指针链表链接叶子节点

初始化时同时还会往底层 boltdb 中写入一些 Bucket 信息,包括 key、meta 这两个 Bucket

tx := s.b.BatchTx()
    tx.LockOutsideApply()
    // 创建 "key" Bucket
    tx.UnsafeCreateBucket(schema.Key)

    // 创建 "meta" Bucket
    schema.UnsafeCreateMetaBucket(tx)
    tx.Unlock()
    s.b.ForceCommit()

插入

MVCC中写入数据时,会先开启一个写事务

func (s *store) Write(trace *traceutil.Trace) TxnWrite {
    s.mu.RLock()
    tx := s.b.BatchTx()
    tx.LockInsideApply()
    tw := &storeTxnWrite{
        storeTxnCommon: storeTxnCommon{s, tx, 0, 0, trace},
        tx:             tx,
        beginRev:       s.currentRev,
        changes:        make([]mvccpb.KeyValue, 0, 4),
    }
    return newMetricsTxnWrite(tw)
}

需要注意的一些点:

  • s.mu.RLock()加只读锁,仅用于并发读取当前的 currentRev,因为还不涉及写操作,所以这里只需要加只读锁

  • 获取 boltdb 中的 batch 写事务并加独占锁;

  • 使用 changes 来记录事务内修改的 kv 列表,容量初始为 4

最后,这里返回的是一个包装了指标数据的写事务,会通过指标记录操作的次数

  • etcd_mvcc_txn_total:记录事务操作的次数,range、put、delete等操作都算事务操作

  • etcd_mvcc_range_total:range 操作的次数

  • etcd_mvcc_put_total:put 操作的次数

  • etcd_debugging_mvcc_total_put_size_in_bytes:put 操作提交的 KV 总占用字节数

  • etcd_mvcc_delete_total:delete 操作的次数

首次写入

写入数据的时候,会先尝试从 btree 中查询这个 key,首次写入时肯定无法找到这个 key

_, created, ver, err := tw.s.kvindex.Get(key, rev)

然后会使用 currentRev + 1 来作为本次写事务的初始版本号; 通过 currentRev + 1 来作为 Main、changes的长度来做为 Sub,共同构成 Revision;这个 Revision 会作为底层 boltdb 的索引 key

rev := tw.beginRev + 1

ibytes := NewRevBytes()
    idxRev := Revision{Main: rev, Sub: int64(len(tw.changes))}
    ibytes = RevToBytes(idxRev, ibytes)

然后再构建底层 boltdb 中的value,会记录 Key 、value、创建的Revision号、上一次修改的Revision号、版本(从1开始递增)、租约

kv := mvccpb.KeyValue{
        Key:            key,
        Value:          value,
        CreateRevision: c,
        ModRevision:    rev,
        Version:        ver,
        Lease:          int64(leaseID),
    }

d, err := kv.Marshal()

写入底层 boltdb,写入的是 "key" 这个 Bucket;索引就是上面的 Revision ,包括 Main 和 Sub;value 就是序列化之后的 kv 信息

tw.tx.UnsafeSeqPut(schema.Key, ibytes, d)

最后再写入内存MVCC,实际就是写入到 btree 中,写入的 key 才是用户提交的 key , value 是Bucket中的索引 Revision ,包括 Main 和 Sub

tw.s.kvindex.Put(key, idxRev)

考虑到每个 kv 都需要记录其变更的历史版本,在写入内存MVCC中的时候,除了会记录这个用户 key 的当前 Revision ,还会保留它的历史版本

func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {
    rev := Revision{Main: main, Sub: sub}

    if len(ki.generations) == 0 {
        ki.generations = append(ki.generations, generation{})
    }
    g := &ki.generations[len(ki.generations)-1]
    if len(g.revs) == 0 { // create a new key
        keysGauge.Inc()
        g.created = rev
    }
    g.revs = append(g.revs, rev)
    g.ver++
    ki.modified = rev
}
  • 每个 Main + Sub 才会作为一个 Revision{Main: main, Sub: sub},即一个版本

  • 通过 generation 中的 revs 数组来记录历史版本,如果不存在 revs ,说明没有历史版本,此时会记录这个用户 key 的创建版本,即当前版本

  • 通过 generation 的 ver 来记录版本数量,同时将当前版本 Revision{Main: main, Sub: sub} 做为最后修改的一个版本

多次写入

对同一个 key 多次写入时,流程也和上述单次写入基本一致,正如上面所说,在写入之前都会先检查 btree 中是否存在这个 key

_, created, ver, err := tw.s.kvindex.Get(key, rev)

多次写入时,就能正常查询到这个 key 了;具体的会从后往前先找到一个 generation,这个 generation 的第一个版本 Main 小于当前写事务的版本号

rev := tw.beginRev + 1

g.revs[0].Main <= rev

然后再从后往前的遍历这个 generation 的所有版本,找到第一个不超过当前写事务版本号的第一个版本作为当前 key的创建版本;

n := g.walk(func(rev Revision) bool { return rev.Main > atRev })

c = created.Main

其它流程就完全一致了,写入到底层 boltdb 中的 value 是最新的 value;而写入到 btree 中的时候会将当前版本追加到 generation 中的 revs 数组中

完成

插入操作需要通过 End 方法来结束当前的事务操作

对于 MetricsTxnWrite 写事务来说,它的 End 方法就是推送上述提到的指标

而对于 StoreTxnWrite 写事务来说,他会在 End 方法中更新 currentRev,即更新全局版本号,通过 revMu 这个锁来控制;释放 boltdb 中的 batch 写事务的锁;同时释放 tw.s.mu.RUnlock()只读锁,对应的就是开始写事务时加的只读锁

func (tw *storeTxnWrite) End() {
    // only update index if the txn modifies the mvcc state.
    if len(tw.changes) != 0 {
        // hold revMu lock to prevent new read txns from opening until writeback.
        tw.s.revMu.Lock()
        tw.s.currentRev++
    }
    tw.tx.Unlock()
    if len(tw.changes) != 0 {
        tw.s.revMu.Unlock()
    }
    tw.s.mu.RUnlock()
}

读取

由于 MVCC 中保存的是用户 key 和 Revision 之间的映射关系,需要获取到 value ,就必须读 boltdb

计数查询

但是如果只是计数 count 的话,就只需要读 MVCC 就可以了

如果没有指定 end key ,那么就只需要从 btree 中查询开始 key 就可以了;查询逻辑与上述多次写入时的查询一致,即先从后往前先找到一个 generation,这个 generation 的第一个版本 Main 小于当前全局 currentRev ;然后再从后往前的遍历这个 generation 的所有版本,找到第一个不超过当前全局 currentRev 的第一个版本即认为找到了,返回个数1;否则返回个数 0

if end == nil {
        _, _, _, err := ti.unsafeGet(key, atRev)
        if err != nil {
            return 0
        }
        return 1
    }

如果指定了 end key ;那么就会遍历 btree ,直到 btree 中的 key 大于等于 end key 停止;一种特殊情况就是 end key 是空数据的时候,会一直遍历到 btree 的最后,这通常对应的是查询时候指定了--from-key等条件;

total := 0
    ti.unsafeVisit(key, end, func(ki *keyIndex) bool {
        if _, _, _, err := ki.get(ti.lg, atRev); err == nil {
            total++
        }
        return true
    })

ti.tree.AscendGreaterOrEqual(keyi, func(item *keyIndex) bool {
        if len(endi.key) > 0 && !item.Less(endi) {
            return false
        }
        if !f(item) {
            return false
        }
        return true
    })

KV查询

但如果是KV查询,也是先按照计数查询相同的逻辑找到满足条件的那些版本,不只是计数了,需要返回对应的版本

然后会依次对这些版本进行遍历去查询 boltdb 底层数据

for i, revpair := range revpairs[:len(kvs)] {

对于每一个版本,都需要先恢复出 boltdb 中的索引,即 Revision{Main: main, Sub: sub}

func RevToBytes(rev Revision, bytes []byte) []byte {
    return BucketKeyToBytes(newBucketKey(rev.Main, rev.Sub, false), bytes)
}

然后根据这个索引去查询 boltdb 中的 "key" 这个 Bucket

_, vs := tr.tx.UnsafeRange(schema.Key, revBytes, nil, 0)

在查询底层 boltdb 时,会做缓存优化,会优先读取 buckets 缓存中是否已经加载过该 Bucket,如果没有加载过该 Bucket,会从底层将这个 Bucket 加载到内存缓存起来,这样上面循环下个 Revision{Main: main, Sub: sub} 时就可以直接从缓存中加载这个 Bucket

bn := bucketType.ID()
    baseReadTx.txMu.RLock()
    bucket, ok := baseReadTx.buckets[bn]
    baseReadTx.txMu.RUnlock()
    lockHeld := false
    if !ok {
        baseReadTx.txMu.Lock()
        lockHeld = true
        bucket = baseReadTx.tx.Bucket(bucketType.Name())
        baseReadTx.buckets[bn] = bucket
    }

最后再通过 Bucket的游标来查询 Revision{Main: main, Sub: sub} 这个索引对应的 value

c := bucket.Cursor()
    baseReadTx.txMu.Unlock()

    k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))

删除

删除操作也支持 range ,因此删除前会先把需要删除的 key range 出来;range 的逻辑和上述 插入 和 读取过程是完全一样的

rrev := tw.beginRev

    keys, _ := tw.s.kvindex.Range(key, end, rrev)

然后再依次删除这些 keys

for _, key := range keys {
        tw.delete(key)
    }

每一个 key 的删除,也是先恢复出 Revision;这里的 Main 取得是全局 currentRev+1 ,意味着 key的删除版本,Sub 都是取得 0 ;还有一个需要注意的标志位是 tombstone =true,这个标志位用于标识这个 key 已经被删除,意味墓碑的意思

idxRev := newBucketKey(tw.beginRev+1, int64(len(tw.changes)), true)

然后重新构建 KV 进行插入,可以看到这里 KV 的 value 是空的;同时删除操作也是通过插入来实现的,这里直接把 value 为空更新到 "key" 这个 Bucket 中去

kv := mvccpb.KeyValue{Key: key}
tw.tx.UnsafeSeqPut(schema.Key, ibytes, d)

然后再更新MVCC中的索引,更新索引的时候会先把 Revision{Main: currentRev+1, Sub: 0} 插入到现有的 generation 中去,然后再给这个用户 key 追加一个空的 generation;最后还包括指标的记录来减少etcd_debugging_mvcc_keys_total

err = tw.s.kvindex.Tombstone(key, idxRev.Revision)

func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {
    ki.put(lg, main, sub)
    ki.generations = append(ki.generations, generation{})
    keysGauge.Dec()
    return nil
}

这里追加一个空 generation 是查询不到的关键;再上述的 range 逻辑中,会从后往前先找到一个 generation,可以看到这里会跳过版本 revs 为空的 generation,然后在处理非最后一个 generation 并且该 generation 的所有版本都不高于全局版本号时,会直接返回空 generation

func (ki *keyIndex) findGeneration(rev int64) *generation {
    lastg := len(ki.generations) - 1
    cg := lastg

    for cg >= 0 {
        if len(ki.generations[cg].revs) == 0 {
            cg--
            continue
        }
        g := ki.generations[cg]
        if cg != lastg {
            if tomb := g.revs[len(g.revs)-1].Main; tomb <= rev {
                return nil
            }
        }
        if g.revs[0].Main <= rev {
            return &ki.generations[cg]
        }
        cg--
    }
    return nil
}

完成

删除操作的结束流程也和插入操作的一样,也会通过 End 来使全局版本号自增

boltdb 操作

上述插入和删除操作都涉及底层 boltdb 的写入操作,这些写入操作都是通过 boltdb 事务来实现的,每次更新数据只会增加 pending 数量

func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq bool) {
    bucket := t.tx.Bucket(bucketType.Name())
    if err := bucket.Put(key, value); err != nil {
        
    }
    t.pending++
}

事务的提交是异步定时执行的,默认是每 100ms 提交一次事务,也就是这 100ms 内的操作会在一个事务里提交

func (b *backend) run() {
    defer close(b.donec)
    t := time.NewTimer(b.batchInterval)
    defer t.Stop()
    for {
        select {
        case <-t.C:
        case <-b.stopc:
            b.batchTx.CommitAndStop()
            return
        }
        if b.batchTx.pending  != 0 {
            b.batchTx.Commit()
        }
        t.Reset(b.batchInterval)
    }
}

事务提交之后,会再次开启新的事务

if !stop {
        t.tx = t.backend.begin(true)
    }

总结

  • MVCC是 KV 的内存存储模型

  • MVCC 本质上是一个 btree ,建立的是用户 key 到 Revision{Main: main, Sub: sub} 之间的索引关系

  • 底层数据都存储在 boltdb 中的 "key" 这个 Bucket中,建立的是 Revision{Main: main, Sub: sub} 到用户 value 之间的索引关系

  • MVCC 中通过 generation 数组 和 generation 中的 revs 数组来记录历史版本;generation 数组会在用户 key 被删除之后增加,generation 中的 revs 数组用于记录每个 key 的历史修改版本

  • 删除 key 不会真的删除,在底层 botdb 中会把 key 的value置空,内存 MVCC 中通过追加空 generation 来让这个 key 成为 tombstone ,在查询时就查不到这个 key 了

  • 插入和删除操作都会使全局版本号自增

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容