背景
在前面 del 操作的时候,已经知道 del 操作也是通过 put 操作来完成的,简单回顾下:
先操作db,将空value更新到key上
在操作内存mvcc,先追加一个新的 Revision(标记为 tombstone ) 到当前的generation,再追加一个空的 generation
可以看到即使 del 操作,数据也不会被删除,可想而知的是,随着 kv 的增加和迭代,占用内存和 db 使用会越来越大
compaction
compaction 的主要目的是删除 kv 的历史版本,减少内存占用
compaction 时需要指定一个 revision ,小于 revision 的那些历史版本会被删除
raft
compaction 需要在所有节点上都执行,因此 compaction 请求会被包装成一次 raft 请求
func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
startTime := time.Now()
result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Compaction: r})
raft 通过追加 entry 的方式 Propose 到所有节点,所有节点都通过 apply 来执行 compaction
更新Store
首先会更新 store 中的 compactMainRev ;用于记录每次 compaction 到了哪一个版本,更新的值就是指定的 revision 参数
需要注意的是,指定的 revision 参数不能小于上一次已经 compaction 的版本号;同时不能大于当前的最大版本号 currentRev
if rev <= s.compactMainRev {
ch := make(chan struct{})
f := schedule.NewJob("kvstore_updateCompactRev_compactBarrier", func(ctx context.Context) { s.compactBarrier(ctx, ch) })
s.fifoSched.Schedule(f)
s.revMu.Unlock()
return ch, 0, ErrCompacted
}
if rev > s.currentRev {
s.revMu.Unlock()
return nil, 0, ErrFutureRev
}
然后往 Meta 这个 Bucket 中写入一对 kv,key 固定为scheduledCompactRev
,value记录为指定的 revision 参数;并写入到 boltdb 中,用于记录当前正在执行 compaction
func UnsafeSetScheduledCompact(tx backend.UnsafeWriter, value int64) {
rbytes := NewRevBytes()
rbytes = RevToBytes(Revision{Main: value}, rbytes)
tx.UnsafePut(schema.Meta, schema.ScheduledCompactKeyName, rbytes)
}
MVCC整理
首先会对内存中的 kvindex 进行整理删除
此时会完整拷贝一份内存中的 kvindex ;然后采用升序遍历 Ascend 的方式依次处理每一个 kv ;可以看到这里处理每一个 kv 时都会先获取锁,处理完后再释放锁,这样可以减小锁住的影响,在整理内存时让其它协程也能有机会获取到锁,执行查询和插入
func (ti *treeIndex) Compact(rev int64) map[Revision]struct{} {
available := make(map[Revision]struct{})
ti.lg.Info("compact tree index", zap.Int64("revision", rev))
ti.Lock()
clone := ti.tree.Clone()
ti.Unlock()
clone.Ascend(func(keyi *keyIndex) bool {
ti.Lock()
keyi.compact(ti.lg, rev, available)
if keyi.isEmpty() {
_, ok := ti.tree.Delete(keyi)
}
ti.Unlock()
return true
})
return available
}
每一个 kv 的处理就是从前往后遍历它的 generation 数组,直到找到一个大于指定 revision 参数的版本;这里比较的是每个 generation 的最后一个版本,也就是还需要进一步定位到这个 generation 的 revs 数组中的具体版本
genIdx, g := 0, &ki.generations[0]
for genIdx < len(ki.generations)-1 {
if tomb := g.revs[len(g.revs)-1].Main; tomb >= atRev {
break
}
genIdx++
g = &ki.generations[genIdx]
}
此时会从后往前的遍历这个 generation 的 revs 数组,直到找到第一个比指定 revision 参数小的版本;将这个版本保存在 available 中,为后面物理删除 boltdb 中的 kv 做准备
f := func(rev Revision) bool {
if rev.Main <= atRev {
available[rev] = struct{}{}
return false
}
return true
}
需要注意的是,那些已经被删除的 key ,如果它的所有版本都是小于指定 revision 参数的,那么会一直找到最新的那个空 generation;由于这是个空 generation,因此这个 key 不会被加入到 available 中;也就是后面物理删除的时候会被直接删除
定位到目标 generation 的目标 revs 后,直接删除之前的 generation 以及 revs 之前的那些版本
g := &ki.generations[genIdx]
if !g.isEmpty() {
// remove the previous contents.
if revIndex != -1 {
g.revs = g.revs[revIndex:]
}
}
// remove the previous generations.
ki.generations = ki.generations[genIdx:]
Boltdb整理
内存 MVCC 删除完成后,在开始物理删除
直接 range 查询范围内的 kv ;这里会分批进行 range,每一批的大小是 1000;
keys, values := tx.UnsafeRange(schema.Key, last, end, int64(1000))
初始的开始 key 是最小的全0;每一批执行完成后在进行更新,更新的就是上一批次的最后一个 kv 的下一个 sub +1
last := make([]byte, 8+1+8)
last = RevToBytes(Revision{Main: rev.Main, Sub: rev.Sub + 1}, last)
最终的结束 key 就是指定的 revision 参数;由于 range 查询是含左不含右,因此这里 +1 会把等于指定 revision 参数的那些 kv 也删除
end := make([]byte, 8)
binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))
range 出来的 kv ,如果在上述 available 中不存在的,那就是需要进行物理删除的,会直接从 Bucket 中删除这个 key
for i := range keys {
rev = BytesToRev(keys[i])
if _, ok := available [rev]; !ok {
tx.UnsafeDelete(schema.Key, keys[i])
keyCompactions++
}
}
等待所有批次都执行完成后,还会往 Meta 这个 Bucket 中写入kv ;key 就是 finishedCompactRev
,value 就是指定的 revision 参数;标志着这个版本的 compaction 完成
func UnsafeSetFinishedCompact(tx backend.UnsafeWriter, value int64) {
rbytes := NewRevBytes()
rbytes = RevToBytes(Revision{Main: value}, rbytes)
tx.UnsafePut(schema.Meta, schema.FinishedCompactKeyName, rbytes)
}
整个 compaction 的执行过程中,可以看到即使上一次的 compaction 还没完成,也还是会正常触发这一次的 compaction ;
但是需要等待之前的任务完成,因为任务是通过 store 的先进先出队列调度的;但是如果是 compaction 执行过程中进程重启了,那么这次 compaction 会直接执行,不管上次的失败结果
s.fifoSched.Schedule(j)
defrag
defrag的主要目标是对 kv 进行物理删除,缩小 boltdb 的使用空间
默认情况下,defrag只会清理通过 --endpoints
指定的那些 etcd 节点;但是可以通过设置--cluster
来请求全部的集群节点
--cluster全局
--cluster
会触发一次线性读,然后再读取所有 Members
在前面 get 操作的时候,已经知道线性读会触发一次 leader 节点同步 commit index 到超过半数的 Followers
defrag影响
如果defrag的是全局节点,实际上也是每个节点依次顺序执行
defrag期间的影响:
如果 defrag 的是 Followers 节点,那么当前节点的读写都会阻塞,表现就是读当前节点会阻塞,同步 Leader 日志也会阻塞;相当于当前节点不可用,如果仍有超过半数节点存活,那么整个集群仍然可用
如果 defrag 的是 leader 节点,那么会触发集群的重新选主,因为 defrag 期间会锁住底层 boltdb ,Leader短暂不可用后重新选主;
因此在实际操作 defrag 时,应该尽量先 defrag Followers节点,最后在 defrag Leader 节点
开始defrag
首先锁住底层 boltdb ,这也是阻塞当前节点读写的原因
b.batchTx.LockOutsideApply()
defer b.batchTx.Unlock()
// lock database after lock tx to avoid deadlock.
b.mu.Lock()
defer b.mu.Unlock()
// block concurrent read requests while resetting tx
b.readTx.Lock()
defer b.readTx.Unlock()
然后创建一个临时文件;这个临时文件将作为新的 boltdb 文件
dir := filepath.Dir(b.db.Path())
temp, err := os.CreateTemp(dir, "db.tmp.*")
将当前节点中的 boltdb 事务提交,并不再开启新的事务
b.batchTx.unsafeCommit(true)
b.batchTx.tx = nil
然后将旧数据库中的所有 Bucket 全部写入到新数据库中
- 操作都是通过事务的方式操作的, 开始旧数据库的只读事务,开始新数据库的读写事务,因为要往新数据库中写入数据
tmptx, err := tmpdb.Begin(true)
tx, err := odb.Begin(false)
- 循环读取旧数据库中的 Bucket,然后再新数据库中创建对应的 Bucket,并写入 Bucket 中的 kv 数据;每写入 10000 条数据后提交一次事务
c := tx.Cursor()
count := 0
for next, _ := c.First(); next != nil; next, _ = c.Next() {
b := tx.Bucket(next)
tmpb, berr := tmptx.CreateBucketIfNotExists(next)
tmpb.FillPercent = 0.9 // for bucket2seq write in for each
if err = b.ForEach(func(k, v []byte) error {
count++
if count > 10000 {
err = tmptx.Commit()
tmptx, err = tmpdb.Begin(true)
tmpb = tmptx.Bucket(next)
tmpb.FillPercent = 0.9 // for bucket2seq write in for each
count = 0
}
return tmpb.Put(k, v)
}); err != nil {
return err
}
}
- 最后再提交这两个事务即可
数据拷贝完成后,直接对临时文件进行重命名,临时文件就作为新的数据库文件了
err = os.Rename(tdbp, dbp)
最后再重新开启新数据库的读写事务,就完成了一次 defrag
b.batchTx.tx = b.unsafeBegin(true)
b.readTx.reset()
b.readTx.tx = b.unsafeBegin(false)