etcdctl put k v
etcdctl 作为 etcd 的客户端工具,通过他操作 kv 时,它会将命令转换为一个 grpc请求,发送到 etcd 的client监听端口
etcdctl put k v 对应的会请求grpc注册方法:/etcdserverpb.KV/Put
etcd收到这个 请求后,会把这个请求包装为一个 raft 请求,交给 raft 状态机处理,并返回 raft 处理结果
func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
ctx = context.WithValue(ctx, traceutil.StartTimeKey{}, time.Now())
resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
if err != nil {
return nil, err
}
return resp.(*pb.PutResponse), nil
}
转换 raft 请求
首先会检查当前 server状态,已经 commit 的index不能落后 apply index太多,commit意味着记录已经通过了 raft 提议,即被超过半数节点认可;apply意味着记录已完成了持久化,数据写入了 db 文件;如果当前 server 存在大量没有落盘的记录,就会拒绝后续请求
ai := s.getAppliedIndex()
ci := s.getCommittedIndex()
if ci > ai+5000{
return nil, errors.ErrTooManyRequests
}
然后初始化一个随机请求ID
r.Header = &pb.RequestHeader{
ID: s.reqIDGen.Next(),
}
请求序列化,检测序列化后的大小不能超过最大的请求大小,默认是1.5M
data, err := r.Marshal()
if len(data) > int(1.5M) {
return nil, errors.ErrRequestTooLarge
}
将请求注册到map中,等待 raft 处理完成后,再从这里通知处理结果
ch := s.w.Register(id)
func (w *list) Register(id uint64) <-chan any {
idx := id % 64
newCh := make(chan any, 1)
w.e[idx].l.Lock()
defer w.e[idx].l.Unlock()
if _, ok := w.e[idx].m[id]; !ok {
w.e[idx].m[id] = newCh
} else {
log.Panicf("dup id %x", id)
}
return newCh
}
像 raft 状态机提交这个请求
err = s.r.Propose(cctx, data)
阻塞等待 raft 处理结果
select {
case x := <-ch:
return x.(*apply2.Result), nil
}
提交 raft 请求
先将请求数据包装为一个消息,类型是pb.MsgProp
,请求数据作为 entry
func (n *node) Propose(ctx context.Context, data []byte) error {
return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}
将上述消息发送到 raft_node 的 prop_channel,即提议一条消息
propc: make(chan msgWithResult) ;prop_channel 是一个无缓冲的通道,通道的读写操作都会阻塞
对于不需要返回结果的消息,会直接返回
对于需要返回结果的会进一步等待这条消息被提交到 raft 状态机;注意只是等待这条消息被正常提议,而不是等待实际的请求执行结果,实际的执行结果还是通过上述注册Register的通道返回
func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {
ch := n.propc
pm := msgWithResult{m: m}
if wait {
pm.result = make(chan error, 1)
}
select {
case ch <- pm:
if !wait {
return nil
}
}
select {
case err := <-pm.result:
if err != nil {
return err
}
}
return nil
}
raft_node 会监听在 prop_channel 上,收到消息后会向 raft 提议这条消息,并返回提议结果,一般用于快速失败
case pm := <-propc:
m := pm.m
m.From = r.id
err := r.Step(m)
if pm.result != nil {
pm.result <- err
close(pm.result)
}
raft 处理请求
Follower
如果这个请求是 Follower 收到的,Follower 是不能处理 pb.MsgProp 类型的消息的,需要把这个消息进一步转发给 Leader
switch m.Type {
case pb.MsgProp:
m.To = r.lead
r.send(m)
这里的发送也是简单的把这个消息追加到 msgs 消息列表里
r.msgs = append(r.msgs, m)
Step完成之后,自动进入下一次循环,此时会检测到 raft 状态机有变化,因为HasReady
方法里会检测 msgs 消息列表里是否存在消息;然后构建 Ready 信号发送到 ready_channel
for {
if advancec == nil && n.rn.HasReady() {
rd = n.rn.readyWithoutAccept()
readyc = n.readyc
}
select {
case pm := <-propc:
m := pm.m
m.From = r.id
err := r.Step(m)
if pm.result != nil {
pm.result <- err
close(pm.result)
}
case readyc <- rd:
n.rn.acceptReady(rd)
advancec = n.advancec
readyc = nil
上层 server 同时监听在 ready_channel ,将消息通过 Peer的Stream连接发送给Leader
case rd := <-r.readyc:
r.transport.Send(msgs)
Leader
Leader 是可以直接处理 pb.MsgProp 这个类型的消息的,他会先尝试把这个消息追加到本地,然后广播这些消息给所有Followers
switch m.Type {
case pb.MsgProp:
if !r.appendEntry(m.Entries...) {
return ErrProposalDropped
}
r.bcastAppend()
return nil
可以看到这个流程和当选 Leader 后发送的空 entry 的流程基本上是一致的,不同的是这里的 entry 是携带了 kv 数据的
因此这里也需要等待超过半数 Members 对这个 entry 进行 commit 确认;确认后 Leader 会先更新自己的 commit index;然后再将最新的 commit index 广播给所有 Followers,来推动 Followers 的 commit index 的更新
func (l *raftLog) commitTo(tocommit uint64) {
// never decrease commit
if l.committed < tocommit {
l.committed = tocommit
}
}
r.bcastAppend()
以 Leader 为例,更新了 commit index 后,而 hard_state 是和 commit index 息息相关的
func (r *raft) hardState() pb.HardState {
return pb.HardState{
Term: r.Term,
Vote: r.Vote,
Commit: r.raftLog.committed,
}
}
因此下一轮会通过 hard_state 检测到 Ready 信号再次就绪,而在 Ready 信号中,会通过 CommittedEntries 携带已经确认的那些 entry,读取的就是上一次 apply index 到 这次 commit index 之间的这些 entry;上述已经通过 raft 流程的 kv entry 就会在这里获取到
rd := Ready{
Entries: r.raftLog.nextUnstableEnts(),
CommittedEntries: r.raftLog.nextCommittedEnts(rn.applyUnstableEntries()),
Messages: r.msgs,
}
上层 server 通过 ready_channel 获取到 CommittedEntries后,会包装为一个 Apply 对象,并发送到 apply_channel
ap := toApply{
entries: rd.CommittedEntries,
snapshot: rd.Snapshot,
notifyc: notifyc,
raftAdvancedC: raftAdvancedC,
}
select {
case r.applyc <- ap:
case <-r.stopped:
return
}
而上层 server 中另外的协程来处理 apply_channel 中的数据
for {
select {
case ap := <-s.r.apply():
具体是通过一个 FIFO 的队列来顺序处理收到的 apply 消息,在没有消息的时候,FIFO 队列会一直阻塞,收到任务后开始顺序执行
sched := schedule.NewFIFOScheduler(lg)
f := schedule.NewJob("server_applyAll", func(context.Context) { s.applyAll(&ep, &ap) })
sched.Schedule(f)
在处理 apply 消息时,会按照顺序依次处理每一个 entry 数据
for i := range apply.entries {
switch e.Type {
case raftpb.EntryNormal:
// gofail: var beforeApplyOneEntryNormal struct{}
s.applyEntryNormal(&e, shouldApplyV3)
对于每一条 entry 数据,会先反序列化成 InternalRaftRequest 对象·
var raftReq pb.InternalRaftRequest
if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // backward compatible
}
然后再次通过上述初始化的随机请求ID,来查询这个请求是否注册过了,注册过的就是需要返回结果的
id := raftReq.ID
needResult := s.w.IsRegistered(id)
然后在根据请求类型来分别处理,这里是 Put 请求,因此会走 Put 的逻辑
switch {
case r.Range != nil:
op = "Range"
ar.Resp, ar.Trace, ar.Err = a.applyV3.Range(r.Range)
case r.Put != nil:
op = "Put"
ar.Resp, ar.Trace, ar.Err = a.applyV3.Put(r.Put)
case r.DeleteRange != nil:
op = "DeleteRange"
ar.Resp, ar.Trace, ar.Err = a.applyV3.DeleteRange(r.DeleteRange)
Put 数据
put 操作与前文中关于 MVCC 中的处理就完全一致了
Del 数据
删除数据 和 Put 数据的逻辑是完全一致的,也是通过追加 entry 的方式来经过完整的一遍 raft 流程,只不过 entry 中的数据是pb.InternalRaftRequest{DeleteRange: r}
, 即 Delete 请求数据
走完 raft 流程后,也是通过 apply 来更新 MVCC 和 boltdb 中的数据
数据的更新方式与前文中关于 MVCC 中的数据删除处理完全一致了
总结
Put 和 Del 操作都需要经历完整的 raft 流程,并且都是通过追加 entry 的方式发送 pb.MsgProp 消息
Followers 不处理 pb.MsgProp 类型的消息,而是转发给 Leader 处理
Leader 发起 raft 流程,并将消息广播给 Followers,等待超过半数 Members 对 entry 进行 commit
commit 之后的 entry 会通过 Ready 信号通知上层 server 进行 apply 操作,apply 操作会依次对 commit 的 entries 进行处理,每一个 entry 都会经历 MVCC 中的 Put 或者 Delete 完整流程