WAL (Write ahead logging) : 存放预写式日志,最大的作用是记录了整个数据变化的全部历程。在 etcd 中,所有数据的修改在提交前,都要先写入到WAL中。.wal
文件命名格式为seq
+idx
。
WAL 机制使得 etcd 具备了以下两个功能:
故障快速恢复: 当你的数据遭到破坏时,就可以通过执行所有 WAL 中记录的修改操作,快速从最原始的数据恢复到数据损坏前的状态。
数据回滚(undo)/重做(redo):因为所有的修改操作都被记录在 WAL 中,需要回滚或重做,只需要正向执行日志中的操作即可
主要方法
Create
创建临时的WAL文件名和目录,对当前文件上锁并预分配空间,然后将临时文件重命名,作为原子操作
Save
核心功能,对小于预分配64MB的文件直接持久化日志和state;大于64MB的部分进行cut,分配(seq+1,index+1)的新名字,以此打开filepipeline预分配的文件进行下一步操作
Open
在指定index打开文件,读取该index后所有的日志信息
ReadAll
读取解码后的Record,主要包含以下几种类型:
- entryType: raft日志,占最多
- stateType: 存储term,vote,commit相关的状态信息
- metadataType: 元数据,WAL中需要保持一致
- crcType: 用于检验文件完整性
- snapshotType: 快照的index和term信息
Verify
用于检验文件是否被污染。每次cut64MB后都会更新crc,记录到文件中,从而能够核验其正确性
WAL 定义
// WAL is a logical representation of the stable storage.
// WAL is either in read mode or append mode but not both.
// A newly created WAL is in append mode, and ready for appending records.
// A just opened WAL is in read mode, and ready for reading records.
// The WAL will be ready for appending after reading out all the previous records.
type WAL struct {
lg *zap.Logger
dir string // the living directory of the underlay files
// dirFile is a fd for the wal directory for syncing on Rename
dirFile *os.File
metadata []byte // metadata recorded at the head of each WAL
state raftpb.HardState // hardstate recorded at the head of WAL
start walpb.Snapshot // snapshot to start reading 从快照确定的位置开始读
decoder *decoder // decoder to decode records
readClose func() error // closer for decode reader
unsafeNoSync bool // if set, do not fsync
mu sync.Mutex
enti uint64 // index of the last entry saved to the wal
encoder *encoder // encoder to encode records
locks []*fileutil.LockedFile // the locked files the WAL holds (the name is increasing)
fp *filePipeline
}
WAL 创建
// Create creates a WAL ready for appending records. The given metadata is
// recorded at the head of each WAL file, and can be retrieved with ReadAll
// after the file is Open.
func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
if Exist(dirpath) {
return nil, os.ErrExist
}
if lg == nil {
lg = zap.NewNop()
}
// keep temporary wal directory so WAL initialization appears atomic
// 先在.tmp上修改,修改完后改名,从而保证原子性
tmpdirpath := filepath.Clean(dirpath) + ".tmp"
if fileutil.Exist(tmpdirpath) {
if err := os.RemoveAll(tmpdirpath); err != nil {
return nil, err
}
}
defer os.RemoveAll(tmpdirpath)
if err := fileutil.CreateDirAll(lg, tmpdirpath); err != nil {
lg.Warn(
"failed to create a temporary WAL directory",
zap.String("tmp-dir-path", tmpdirpath),
zap.String("dir-path", dirpath),
zap.Error(err),
)
return nil, err
}
// path: dir/walname
// walname: seq+index
p := filepath.Join(tmpdirpath, walName(0, 0))
// 对当前文件上锁
f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
if err != nil {
lg.Warn(
"failed to flock an initial WAL file",
zap.String("path", p),
zap.Error(err),
)
return nil, err
}
// 找到文件末尾
if _, err = f.Seek(0, io.SeekEnd); err != nil {
lg.Warn(
"failed to seek an initial WAL file",
zap.String("path", p),
zap.Error(err),
)
return nil, err
}
// 预分配64MB
if err = fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil {
lg.Warn(
"failed to preallocate an initial WAL file",
zap.String("path", p),
zap.Int64("segment-bytes", SegmentSizeBytes),
zap.Error(err),
)
return nil, err
}
// 新建WAL,加上encoder并保存snapshot
w := &WAL{
lg: lg,
dir: dirpath,
metadata: metadata,
}
w.encoder, err = newFileEncoder(f.File, 0)
if err != nil {
return nil, err
}
// 将当前上锁的文件加入到locks数组中(存放已经上锁的文件)
w.locks = append(w.locks, f)
if err = w.saveCrc(0); err != nil {
return nil, err
}
if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
return nil, err
}
if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
return nil, err
}
// 将.tmp改名重命名,原子操作
logDirPath := w.dir
if w, err = w.renameWAL(tmpdirpath); err != nil {
lg.Warn(
"failed to rename the temporary WAL directory",
zap.String("tmp-dir-path", tmpdirpath),
zap.String("dir-path", logDirPath),
zap.Error(err),
)
return nil, err
}
var perr error
defer func() {
if perr != nil {
w.cleanupWAL(lg)
}
}()
// directory was renamed; sync parent dir to persist rename
pdir, perr := fileutil.OpenDir(filepath.Dir(w.dir))
if perr != nil {
lg.Warn(
"failed to open the parent data directory",
zap.String("parent-dir-path", filepath.Dir(w.dir)),
zap.String("dir-path", w.dir),
zap.Error(perr),
)
return nil, perr
}
dirCloser := func() error {
if perr = pdir.Close(); perr != nil {
lg.Warn(
"failed to close the parent data directory file",
zap.String("parent-dir-path", filepath.Dir(w.dir)),
zap.String("dir-path", w.dir),
zap.Error(perr),
)
return perr
}
return nil
}
start := time.Now()
// 将上述操作同步
if perr = fileutil.Fsync(pdir); perr != nil {
dirCloser()
lg.Warn(
"failed to fsync the parent data directory file",
zap.String("parent-dir-path", filepath.Dir(w.dir)),
zap.String("dir-path", w.dir),
zap.Error(perr),
)
return nil, perr
}
walFsyncSec.Observe(time.Since(start).Seconds())
if err = dirCloser(); err != nil {
return nil, err
}
return w, nil
}