本次代码阅读基于commit 189fdd3
1. raftwal
之前提到, etcd/raft
提供了 MemoryStorage
+ wal
的方式 来对 raft 中的 HardState, Snapshot 和 Entry 进行持久化. wal
将数据直接写入文件.
而对于 dgraph 来说, 它的一个物理节点上有多个 raft group, 且 raft group 会自动新建. 此时, 所有 raft group 使用同一套底层存储会相对简单一些.
本包中, dgraph 使用 badger 这个同属 dgraph-io 出品的 kv 数据库来保存所有 raft group 的日志.
1.1 Keys
既然不同 raft 的日志都存在同一个 kv 数据库中, 那么就需要对存储的 key 进行有效地区分.
对于一个 raft node 来说, 它通过节点 id RaftId(uint64)
和 组 id gid(uint32)
两层 来标识自己
相应地, raftwal 中的三类 key 都包含这两个 id
-
snapshotKey:
func (w *Wal) snapshotKey(gid uint32) []byte { b := make([]byte, 14) binary.BigEndian.PutUint64(b[0:8], w.id) copy(b[8:10], []byte("ss")) binary.BigEndian.PutUint32(b[10:14], gid) return b }
-
hardStateKey:
func (w *Wal) hardStateKey(gid uint32) []byte { b := make([]byte, 14) binary.BigEndian.PutUint64(b[0:8], w.id) copy(b[8:10], []byte("hs")) binary.BigEndian.PutUint32(b[10:14], gid) return b }
-
entryKey:
func (w *Wal) entryKey(gid uint32, term, idx uint64) []byte { b := make([]byte, 28) binary.BigEndian.PutUint64(b[0:8], w.id) binary.BigEndian.PutUint32(b[8:12], gid) binary.BigEndian.PutUint64(b[12:20], term) binary.BigEndian.PutUint64(b[20:28], idx) return b }
1.2 Wal
Wal 提供 raft 数据的读写.
对于 raft 数据的持久化, 最重要的是保证数据的一致性.
StoreSnapshot
func (w *Wal) StoreSnapshot(gid uint32, s raftpb.Snapshot) error {
txn := w.wals.NewTransactionAt(1, true)
defer txn.Discard()
// ...
if err := txn.Set(w.snapshotKey(gid), data); err != nil {
return err
}
// ...
// 清除 snapshot 数据之前的所有 entry
// Delete all entries before this snapshot to save disk space.
start := w.entryKey(gid, 0, 0)
last := w.entryKey(gid, s.Metadata.Term, s.Metadata.Index)
// 这里利用了 badger 的特性, 在遍历的时候仅读取 key 数据, 减少了读取 value 带来的开销
opt := badger.DefaultIteratorOptions
opt.PrefetchValues = false
itr := txn.NewIterator(opt)
defer itr.Close()
// 逐一删除不再需要的 entry
for itr.Seek(start); itr.Valid(); itr.Next() {
// ...
}
// Failure to delete entries is not a fatal error, so should be
// ok to ignore
if err := txn.CommitAt(1, nil); err != nil {
x.Printf("Error while storing snapshot %v\n", err)
return err
}
return nil
}
Store
// Store stores the hardstate and entries for a given RAFT group.
func (w *Wal) Store(gid uint32, h raftpb.HardState, es []raftpb.Entry) error {
txn := w.wals.NewTransactionAt(1, true)
var t, i uint64
// 逐一保存 entry
for _, e := range es {
t, i = e.Term, e.Index
// ...
}
// 如果有必要, 保存 HardState
if !raft.IsEmptyHardState(h) {
// ...
}
// If we get no entries, then the default value of t and i would be zero. That would
// end up deleting all the previous valid raft entry logs. This check avoids that.
if t > 0 || i > 0 {
// When writing an Entry with Index i, any previously-persisted entries
// with Index >= i must be discarded.
// Ideally we should be deleting entries from previous term with index >= i,
// but to avoid complexity we remove them during reading from wal.
// 有可能出现某个时间点之后, 由于网络原因, 数据分叉的情形.
// 为了在网络恢复之后保证数据一致性, 对于每一批 entry, 需要清除逻辑上排在这批数据之后的 entry.
start := w.entryKey(gid, t, i+1)
prefix := w.prefix(gid)
// ...
// 逐一清除
for itr.Seek(start); itr.ValidForPrefix(prefix); itr.Next() {
// ...
}
}
if err := txn.CommitAt(1, nil); err != nil {
return err
}
return nil
}
读取
func (w *Wal) Snapshot(gid uint32) (snap raftpb.Snapshot, rerr error) {
// ...
}
func (w *Wal) HardState(gid uint32) (hd raftpb.HardState, rerr error) {
// ...
}
func (w *Wal) Entries(gid uint32, fromTerm, fromIndex uint64) (es []raftpb.Entry, rerr error) {
// ...
}
1.3 关于badger
badger 来源于这篇论文 WiscKey: Separating Keys from Values in SSD-conscious Storage. .
知乎上仅有的评论里, 对它的评价不甚高 如何评价 Badger (fast key-value storage) 😂.
但不论怎样, 它在一些情况下确实比较 快, 也可能非常适合 dgraph 的使用场景.
2. conn
conn
充当了 etcd/raft
的网络传输层, 基于 gRPC
在 raft 节点之间同步信息.
2.1 Pool
看名字是个连接池, 实际上其中的 *grpc.ClienctConn
是复用的.
一旦创建, 会每隔 10 秒尝试 ping 一下, 根据结果判断当前连接是否可用.
// "Pool" is used to manage the grpc client connection(s) for communicating with other
// worker instances. Right now it just holds one of them.
type Pool struct {
sync.RWMutex
// 这段注释说明了 *grpc.ClientConn 可以服用的原因
// A "pool" now consists of one connection. gRPC uses HTTP2 transport to combine
// messages in the same TCP stream.
conn *grpc.ClientConn
// 上一次 ping 请求成功的时间
lastEcho time.Time
// 目标节点的地址
Addr string
// 发起 ping 请求的 ticker
ticker *time.Ticker
}
2.2 Pools
Pools 维护了不同地址的 Pool.
这里是一个单例.
type Pools struct {
sync.RWMutex
all map[string]*Pool
}
var pi *Pools
func init() {
pi = new(Pools)
pi.all = make(map[string]*Pool)
}
2.3 Node
Node 的用于维护 raft 成员节点, 以及在各节点之间传输信息.职责包括:
初始化 / 读取 当前的 raft.Node
// SetRaft would set the provided raft.Node to this node.
// It would check fail if the node is already set.
func (n *Node) SetRaft(r raft.Node) {
// ...
}
// Raft would return back the raft.Node stored in the node.
func (n *Node) Raft() raft.Node {
// ...
}
维护 ConfState
即节点 id 列表
// SetConfState would store the latest ConfState generated by ApplyConfChange.
func (n *Node) SetConfState(cs *raftpb.ConfState) {
// ...
}
// ConfState would return the latest ConfState stored in node.
func (n *Node) ConfState() *raftpb.ConfState {
// ...
}
维护 节点 id - 地址 的对应关系
func (n *Node) Peer(pid uint64) (string, bool) {
// ...
}
// addr must not be empty.
func (n *Node) SetPeer(pid uint64, addr string) {
// ...
}
func (n *Node) DeletePeer(pid uint64) {
// ...
}
// Connects the node and makes its peerPool refer to the constructed pool and address
// (possibly updating ourselves from the old address.) (Unless pid is ourselves, in which
// case this does nothing.)
func (n *Node) Connect(pid uint64, addr string) {
// ..
}
加入和移除节点
-
加入节点
// 这个函数是可以认为是 AddCluster 的回调 // 由使用者在收到 ConfChange 成功 apply 时主动调用 // dgraph 中调用这个方法的地方 err 都传入了 nil func (n *Node) DoneConfChange(id uint64, err error) { n.Lock() defer n.Unlock() ch, has := n.confChanges[id] if !has { return } delete(n.confChanges, id) ch <- err } func (n *Node) AddToCluster(ctx context.Context, pid uint64) error { addr, ok := n.Peer(pid) // ... rcBytes, err := rc.Marshal() // ... ch := make(chan error, 1) // 这个函数中, 将 channel 和一个随机生成的 id 映射起来 // 并在向其他节点同步的信息中带上这个 id id := n.storeConfChange(ch) err = n.Raft().ProposeConfChange(ctx, raftpb.ConfChange{ ID: id, Type: raftpb.ConfChangeAddNode, NodeID: pid, Context: rcBytes, }) if err != nil { return err } // 等待 ConfChange apply 成功的回调 err = <-ch return err }
-
移除节点
func (n *Node) ProposePeerRemoval(ctx context.Context, id uint64) error { // ... // 和 AddToCluster 类似, 这里需要等待 ConfChange 完成 ch := make(chan error, 1) pid := n.storeConfChange(ch) err := n.Raft().ProposeConfChange(ctx, raftpb.ConfChange{ ID: pid, Type: raftpb.ConfChangeRemoveNode, NodeID: id, }) // ... return err }
节点间同步信息
func (n *Node) Send(m raftpb.Message) {
// ...
select {
case n.messages <- sendmsg{to: m.To, data: data}:
// pass
// - -0 为什么这边会有 ignore... 仅仅是为了不阻塞调用者么?
default:
// ignore
}
}
// 所有通过 n.messages 传递的信息都会积累到一定程度后一起发送
// 发往同一个节点的信息也会整合
func (n *Node) BatchAndSendMessages() {
// 对同一个目标 id, 始终复用一个 *bytes.Buffer
batches := make(map[uint64]*bytes.Buffer)
for {
totalSize := 0
sm := <-n.messages
slurp_loop:
for {
// 如有必要, 初始化 *bytes.Buffer
var buf *bytes.Buffer
// ...
// 先将当前 data 的长度写入, 用做 message 之间的分隔.
// 再写入 data 本体
// 因此每条 message 占用 4 + len(sm.data)
totalSize += 4 + len(sm.data)
x.Check(binary.Write(buf, binary.LittleEndian, uint32(len(sm.data))))
x.Check2(buf.Write(sm.data))
// 如果累积的数据量足够大, 中断此次汇集, 执行发送
if totalSize > messageBatchSoftLimit {
// We limit the batch size, but we aren't pushing back on
// n.messages, because the loop below spawns a goroutine
// to do its dirty work. This is good because right now
// (*node).send fails(!) if the channel is full.
break
}
// 如果没有新的 message 传入, 同样中断汇集执行发送
select {
case sm = <-n.messages:
default:
break slurp_loop
}
}
// 执行发送
for to, buf := range batches {
if buf.Len() == 0 {
continue
}
data := make([]byte, buf.Len())
copy(data, buf.Bytes())
go n.doSendMessage(to, data)
// 重置 buf 供下一轮 message 汇集循环使用
buf.Reset()
}
}
}
func (n *Node) doSendMessage(to uint64, data []byte) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// 获取到指定节点的连接
addr, has := n.Peer(to)
pool, err := Get().Get(addr)
if !has || err != nil {
x.Printf("No healthy connection found to node Id: %d, err: %v\n", to, err)
// No such peer exists or we got handed a bogus config (bad addr), so we
// can't send messages to this peer.
return
}
client := pool.Get()
// ...
ch := make(chan error, 1)
go func() {
_, err = c.RaftMessage(ctx, p)
if err != nil {
x.Printf("Error while sending message to node Id: %d, err: %v\n", to, err)
}
ch <- err
}()
// 超时或发送完成
select {
case <-ctx.Done():
return
case <-ch:
// We don't need to do anything if we receive any error while sending message.
// RAFT would automatically retry.
return
}
}
保存/恢复 raft 数据
func (n *Node) SaveSnapshot(s raftpb.Snapshot) {
// ...
}
func (n *Node) SaveToStorage(h raftpb.HardState, es []raftpb.Entry) {
// ...
}
func (n *Node) InitFromWal(wal *raftwal.Wal) (idx uint64, restart bool, rerr error) {
// ...
}
WaitForMinProposal
这里应该是 LinearRead 相关, 用来确认 Read 对应的 message 已经 apply
func (n *Node) WaitForMinProposal(ctx context.Context, read *api.LinRead) error {
if read == nil || read.Ids == nil {
return nil
}
gid := n.RaftContext.Group
min := read.Ids[gid]
return n.Applied.WaitForMark(ctx, min)
}
2.4 RaftServer
RaftServer 是 gRPC service Raft 的实现, 内部是对 Node 的操作.