ETCD《九》-- compaction/defrag操作

背景

在前面 del 操作的时候,已经知道 del 操作也是通过 put 操作来完成的,简单回顾下:

  • 先操作db,将空value更新到key上

  • 在操作内存mvcc,先追加一个新的 Revision(标记为 tombstone ) 到当前的generation,再追加一个空的 generation

可以看到即使 del 操作,数据也不会被删除,可想而知的是,随着 kv 的增加和迭代,占用内存和 db 使用会越来越大

compaction

compaction 的主要目的是删除 kv 的历史版本,减少内存占用

compaction 时需要指定一个 revision ,小于 revision 的那些历史版本会被删除

raft

compaction 需要在所有节点上都执行,因此 compaction 请求会被包装成一次 raft 请求

func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
    startTime := time.Now()
    result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Compaction: r})

raft 通过追加 entry 的方式 Propose 到所有节点,所有节点都通过 apply 来执行 compaction

更新Store

首先会更新 store 中的 compactMainRev ;用于记录每次 compaction 到了哪一个版本,更新的值就是指定的 revision 参数

需要注意的是,指定的 revision 参数不能小于上一次已经 compaction 的版本号;同时不能大于当前的最大版本号 currentRev

if rev <= s.compactMainRev {
        ch := make(chan struct{})
        f := schedule.NewJob("kvstore_updateCompactRev_compactBarrier", func(ctx context.Context) { s.compactBarrier(ctx, ch) })
        s.fifoSched.Schedule(f)
        s.revMu.Unlock()
        return ch, 0, ErrCompacted
    }
    if rev > s.currentRev {
        s.revMu.Unlock()
        return nil, 0, ErrFutureRev
    }

然后往 Meta 这个 Bucket 中写入一对 kv,key 固定为scheduledCompactRev,value记录为指定的 revision 参数;并写入到 boltdb 中,用于记录当前正在执行 compaction

func UnsafeSetScheduledCompact(tx backend.UnsafeWriter, value int64) {
    rbytes := NewRevBytes()
    rbytes = RevToBytes(Revision{Main: value}, rbytes)
    tx.UnsafePut(schema.Meta, schema.ScheduledCompactKeyName, rbytes)
}

MVCC整理

首先会对内存中的 kvindex 进行整理删除

此时会完整拷贝一份内存中的 kvindex ;然后采用升序遍历 Ascend 的方式依次处理每一个 kv ;可以看到这里处理每一个 kv 时都会先获取锁,处理完后再释放锁,这样可以减小锁住的影响,在整理内存时让其它协程也能有机会获取到锁,执行查询和插入

func (ti *treeIndex) Compact(rev int64) map[Revision]struct{} {
    available := make(map[Revision]struct{})
    ti.lg.Info("compact tree index", zap.Int64("revision", rev))
    ti.Lock()
    clone := ti.tree.Clone()
    ti.Unlock()

    clone.Ascend(func(keyi *keyIndex) bool {
        
        ti.Lock()
        keyi.compact(ti.lg, rev, available)
        if keyi.isEmpty() {
            _, ok := ti.tree.Delete(keyi)
        }
        ti.Unlock()
        return true
    })
    return available
}

每一个 kv 的处理就是从前往后遍历它的 generation 数组,直到找到一个大于指定 revision 参数的版本;这里比较的是每个 generation 的最后一个版本,也就是还需要进一步定位到这个 generation 的 revs 数组中的具体版本

genIdx, g := 0, &ki.generations[0]
for genIdx < len(ki.generations)-1 {
        if tomb := g.revs[len(g.revs)-1].Main; tomb >= atRev {
            break
        }
        genIdx++
        g = &ki.generations[genIdx]
    }

此时会从后往前的遍历这个 generation 的 revs 数组,直到找到第一个比指定 revision 参数小的版本;将这个版本保存在 available 中,为后面物理删除 boltdb 中的 kv 做准备

f := func(rev Revision) bool {
        if rev.Main <= atRev {
            available[rev] = struct{}{}
            return false
        }
        return true
    }

需要注意的是,那些已经被删除的 key ,如果它的所有版本都是小于指定 revision 参数的,那么会一直找到最新的那个空 generation;由于这是个空 generation,因此这个 key 不会被加入到 available 中;也就是后面物理删除的时候会被直接删除

定位到目标 generation 的目标 revs 后,直接删除之前的 generation 以及 revs 之前的那些版本

g := &ki.generations[genIdx]
    if !g.isEmpty() {
        // remove the previous contents.
        if revIndex != -1 {
            g.revs = g.revs[revIndex:]
        }
    }

    // remove the previous generations.
    ki.generations = ki.generations[genIdx:]

Boltdb整理

内存 MVCC 删除完成后,在开始物理删除

直接 range 查询范围内的 kv ;这里会分批进行 range,每一批的大小是 1000;

keys, values := tx.UnsafeRange(schema.Key, last, end, int64(1000))

初始的开始 key 是最小的全0;每一批执行完成后在进行更新,更新的就是上一批次的最后一个 kv 的下一个 sub +1

last := make([]byte, 8+1+8)

last = RevToBytes(Revision{Main: rev.Main, Sub: rev.Sub + 1}, last)

最终的结束 key 就是指定的 revision 参数;由于 range 查询是含左不含右,因此这里 +1 会把等于指定 revision 参数的那些 kv 也删除

end := make([]byte, 8)
    binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))

range 出来的 kv ,如果在上述 available 中不存在的,那就是需要进行物理删除的,会直接从 Bucket 中删除这个 key

for i := range keys {
            rev = BytesToRev(keys[i])
            if _, ok := available [rev]; !ok {
                tx.UnsafeDelete(schema.Key, keys[i])
                keyCompactions++
            }
        }

等待所有批次都执行完成后,还会往 Meta 这个 Bucket 中写入kv ;key 就是 finishedCompactRev,value 就是指定的 revision 参数;标志着这个版本的 compaction 完成

func UnsafeSetFinishedCompact(tx backend.UnsafeWriter, value int64) {
    rbytes := NewRevBytes()
    rbytes = RevToBytes(Revision{Main: value}, rbytes)
    tx.UnsafePut(schema.Meta, schema.FinishedCompactKeyName, rbytes)
}

整个 compaction 的执行过程中,可以看到即使上一次的 compaction 还没完成,也还是会正常触发这一次的 compaction ;

但是需要等待之前的任务完成,因为任务是通过 store 的先进先出队列调度的;但是如果是 compaction 执行过程中进程重启了,那么这次 compaction 会直接执行,不管上次的失败结果

s.fifoSched.Schedule(j)

defrag

defrag的主要目标是对 kv 进行物理删除,缩小 boltdb 的使用空间

默认情况下,defrag只会清理通过 --endpoints指定的那些 etcd 节点;但是可以通过设置--cluster来请求全部的集群节点

--cluster全局

--cluster会触发一次线性读,然后再读取所有 Members

在前面 get 操作的时候,已经知道线性读会触发一次 leader 节点同步 commit index 到超过半数的 Followers

defrag影响

如果defrag的是全局节点,实际上也是每个节点依次顺序执行

defrag期间的影响:

  • 如果 defrag 的是 Followers 节点,那么当前节点的读写都会阻塞,表现就是读当前节点会阻塞,同步 Leader 日志也会阻塞;相当于当前节点不可用,如果仍有超过半数节点存活,那么整个集群仍然可用

  • 如果 defrag 的是 leader 节点,那么会触发集群的重新选主,因为 defrag 期间会锁住底层 boltdb ,Leader短暂不可用后重新选主;

因此在实际操作 defrag 时,应该尽量先 defrag Followers节点,最后在 defrag Leader 节点

开始defrag

首先锁住底层 boltdb ,这也是阻塞当前节点读写的原因

b.batchTx.LockOutsideApply()
    defer b.batchTx.Unlock()

    // lock database after lock tx to avoid deadlock.
    b.mu.Lock()
    defer b.mu.Unlock()

    // block concurrent read requests while resetting tx
    b.readTx.Lock()
    defer b.readTx.Unlock()

然后创建一个临时文件;这个临时文件将作为新的 boltdb 文件

dir := filepath.Dir(b.db.Path())
    temp, err := os.CreateTemp(dir, "db.tmp.*")

将当前节点中的 boltdb 事务提交,并不再开启新的事务

b.batchTx.unsafeCommit(true)
    b.batchTx.tx = nil

然后将旧数据库中的所有 Bucket 全部写入到新数据库中

  • 操作都是通过事务的方式操作的, 开始旧数据库的只读事务,开始新数据库的读写事务,因为要往新数据库中写入数据
tmptx, err := tmpdb.Begin(true)
tx, err := odb.Begin(false)
  • 循环读取旧数据库中的 Bucket,然后再新数据库中创建对应的 Bucket,并写入 Bucket 中的 kv 数据;每写入 10000 条数据后提交一次事务
c := tx.Cursor()

    count := 0
    for next, _ := c.First(); next != nil; next, _ = c.Next() {
        b := tx.Bucket(next)
        
        tmpb, berr := tmptx.CreateBucketIfNotExists(next)
        
        tmpb.FillPercent = 0.9 // for bucket2seq write in for each

        if err = b.ForEach(func(k, v []byte) error {
            count++
            if count > 10000 {
                err = tmptx.Commit()
                
                tmptx, err = tmpdb.Begin(true)

                tmpb = tmptx.Bucket(next)
                tmpb.FillPercent = 0.9 // for bucket2seq write in for each

                count = 0
            }
            return tmpb.Put(k, v)
        }); err != nil {
            return err
        }
    }
  • 最后再提交这两个事务即可

数据拷贝完成后,直接对临时文件进行重命名,临时文件就作为新的数据库文件了

err = os.Rename(tdbp, dbp)

最后再重新开启新数据库的读写事务,就完成了一次 defrag

b.batchTx.tx = b.unsafeBegin(true)

    b.readTx.reset()
    b.readTx.tx = b.unsafeBegin(false)
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容