ETCD《六》--Put/Del操作

etcdctl put k v

etcdctl 作为 etcd 的客户端工具,通过他操作 kv 时,它会将命令转换为一个 grpc请求,发送到 etcd 的client监听端口

etcdctl put k v 对应的会请求grpc注册方法:/etcdserverpb.KV/Put

etcd收到这个 请求后,会把这个请求包装为一个 raft 请求,交给 raft 状态机处理,并返回 raft 处理结果

func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
    ctx = context.WithValue(ctx, traceutil.StartTimeKey{}, time.Now())
    resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
    if err != nil {
        return nil, err
    }
    return resp.(*pb.PutResponse), nil
}

转换 raft 请求

首先会检查当前 server状态,已经 commit 的index不能落后 apply index太多,commit意味着记录已经通过了 raft 提议,即被超过半数节点认可;apply意味着记录已完成了持久化,数据写入了 db 文件;如果当前 server 存在大量没有落盘的记录,就会拒绝后续请求

ai := s.getAppliedIndex()
    ci := s.getCommittedIndex()
    if ci > ai+5000{
        return nil, errors.ErrTooManyRequests
    }

然后初始化一个随机请求ID

r.Header = &pb.RequestHeader{
        ID: s.reqIDGen.Next(),
    }

请求序列化,检测序列化后的大小不能超过最大的请求大小,默认是1.5M

data, err := r.Marshal()

if len(data) > int(1.5M) {
        return nil, errors.ErrRequestTooLarge
    }

将请求注册到map中,等待 raft 处理完成后,再从这里通知处理结果

ch := s.w.Register(id)

func (w *list) Register(id uint64) <-chan any {
    idx := id % 64
    newCh := make(chan any, 1)
    w.e[idx].l.Lock()
    defer w.e[idx].l.Unlock()
    if _, ok := w.e[idx].m[id]; !ok {
        w.e[idx].m[id] = newCh
    } else {
        log.Panicf("dup id %x", id)
    }
    return newCh
}

像 raft 状态机提交这个请求

err = s.r.Propose(cctx, data)

阻塞等待 raft 处理结果

select {
    case x := <-ch:
        return x.(*apply2.Result), nil
    }

提交 raft 请求

先将请求数据包装为一个消息,类型是pb.MsgProp,请求数据作为 entry

func (n *node) Propose(ctx context.Context, data []byte) error {
    return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}

将上述消息发送到 raft_node 的 prop_channel,即提议一条消息

  • propc: make(chan msgWithResult) ;prop_channel 是一个无缓冲的通道,通道的读写操作都会阻塞

  • 对于不需要返回结果的消息,会直接返回

  • 对于需要返回结果的会进一步等待这条消息被提交到 raft 状态机;注意只是等待这条消息被正常提议,而不是等待实际的请求执行结果,实际的执行结果还是通过上述注册Register的通道返回

func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {
    
    ch := n.propc
    pm := msgWithResult{m: m}
    if wait {
        pm.result = make(chan error, 1)
    }
    select {
    case ch <- pm:
        if !wait {
            return nil
        }
    
    }
    select {
    case err := <-pm.result:
        if err != nil {
            return err
        }
    }
    return nil
}

raft_node 会监听在 prop_channel 上,收到消息后会向 raft 提议这条消息,并返回提议结果,一般用于快速失败

case pm := <-propc:
            m := pm.m
            m.From = r.id
            err := r.Step(m)
            if pm.result != nil {
                pm.result <- err
                close(pm.result)
            }

raft 处理请求

Follower

如果这个请求是 Follower 收到的,Follower 是不能处理 pb.MsgProp 类型的消息的,需要把这个消息进一步转发给 Leader

switch m.Type {
    case pb.MsgProp:
        m.To = r.lead
        r.send(m)

这里的发送也是简单的把这个消息追加到 msgs 消息列表里

r.msgs = append(r.msgs, m)

Step完成之后,自动进入下一次循环,此时会检测到 raft 状态机有变化,因为HasReady方法里会检测 msgs 消息列表里是否存在消息;然后构建 Ready 信号发送到 ready_channel

for {
        if advancec == nil && n.rn.HasReady() {
            rd = n.rn.readyWithoutAccept()
            readyc = n.readyc
        }

        select {
        case pm := <-propc:
            m := pm.m
            m.From = r.id
            err := r.Step(m)
            if pm.result != nil {
                pm.result <- err
                close(pm.result)
            }
        case readyc <- rd:
            n.rn.acceptReady(rd)
            advancec = n.advancec
            readyc = nil

上层 server 同时监听在 ready_channel ,将消息通过 Peer的Stream连接发送给Leader

case rd := <-r.readyc:
                    r.transport.Send(msgs)

Leader

Leader 是可以直接处理 pb.MsgProp 这个类型的消息的,他会先尝试把这个消息追加到本地,然后广播这些消息给所有Followers

switch m.Type {
    case pb.MsgProp:
        if !r.appendEntry(m.Entries...) {
            return ErrProposalDropped
        }
        r.bcastAppend()
        return nil

可以看到这个流程和当选 Leader 后发送的空 entry 的流程基本上是一致的,不同的是这里的 entry 是携带了 kv 数据的

因此这里也需要等待超过半数 Members 对这个 entry 进行 commit 确认;确认后 Leader 会先更新自己的 commit index;然后再将最新的 commit index 广播给所有 Followers,来推动 Followers 的 commit index 的更新

func (l *raftLog) commitTo(tocommit uint64) {
    // never decrease commit
    if l.committed < tocommit {
        l.committed = tocommit
    }
}

r.bcastAppend()

以 Leader 为例,更新了 commit index 后,而 hard_state 是和 commit index 息息相关的

func (r *raft) hardState() pb.HardState {
    return pb.HardState{
        Term:   r.Term,
        Vote:   r.Vote,
        Commit: r.raftLog.committed,
    }
}

因此下一轮会通过 hard_state 检测到 Ready 信号再次就绪,而在 Ready 信号中,会通过 CommittedEntries 携带已经确认的那些 entry,读取的就是上一次 apply index 到 这次 commit index 之间的这些 entry;上述已经通过 raft 流程的 kv entry 就会在这里获取到

rd := Ready{
        Entries:          r.raftLog.nextUnstableEnts(),
        CommittedEntries: r.raftLog.nextCommittedEnts(rn.applyUnstableEntries()),
        Messages:         r.msgs,
    }

上层 server 通过 ready_channel 获取到 CommittedEntries后,会包装为一个 Apply 对象,并发送到 apply_channel

ap := toApply{
                    entries:       rd.CommittedEntries,
                    snapshot:      rd.Snapshot,
                    notifyc:       notifyc,
                    raftAdvancedC: raftAdvancedC,
                }

select {
                case r.applyc <- ap:
                case <-r.stopped:
                    return
                }

而上层 server 中另外的协程来处理 apply_channel 中的数据

for {
        select {
        case ap := <-s.r.apply():

具体是通过一个 FIFO 的队列来顺序处理收到的 apply 消息,在没有消息的时候,FIFO 队列会一直阻塞,收到任务后开始顺序执行

sched := schedule.NewFIFOScheduler(lg)

f := schedule.NewJob("server_applyAll", func(context.Context) { s.applyAll(&ep, &ap) })
sched.Schedule(f)

在处理 apply 消息时,会按照顺序依次处理每一个 entry 数据

for i := range apply.entries {
        switch e.Type {
        case raftpb.EntryNormal:
            // gofail: var beforeApplyOneEntryNormal struct{}
            s.applyEntryNormal(&e, shouldApplyV3)

对于每一条 entry 数据,会先反序列化成 InternalRaftRequest 对象·

var raftReq pb.InternalRaftRequest
    if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // backward compatible
        
    }

然后再次通过上述初始化的随机请求ID,来查询这个请求是否注册过了,注册过的就是需要返回结果的

id := raftReq.ID
needResult := s.w.IsRegistered(id)

然后在根据请求类型来分别处理,这里是 Put 请求,因此会走 Put 的逻辑

switch {
    case r.Range != nil:
        op = "Range"
        ar.Resp, ar.Trace, ar.Err = a.applyV3.Range(r.Range)
    case r.Put != nil:
        op = "Put"
        ar.Resp, ar.Trace, ar.Err = a.applyV3.Put(r.Put)
    case r.DeleteRange != nil:
        op = "DeleteRange"
        ar.Resp, ar.Trace, ar.Err = a.applyV3.DeleteRange(r.DeleteRange)

Put 数据

put 操作与前文中关于 MVCC 中的处理就完全一致了

Del 数据

删除数据 和 Put 数据的逻辑是完全一致的,也是通过追加 entry 的方式来经过完整的一遍 raft 流程,只不过 entry 中的数据是pb.InternalRaftRequest{DeleteRange: r} , 即 Delete 请求数据

走完 raft 流程后,也是通过 apply 来更新 MVCC 和 boltdb 中的数据

数据的更新方式与前文中关于 MVCC 中的数据删除处理完全一致了

总结

  • Put 和 Del 操作都需要经历完整的 raft 流程,并且都是通过追加 entry 的方式发送 pb.MsgProp 消息

  • Followers 不处理 pb.MsgProp 类型的消息,而是转发给 Leader 处理

  • Leader 发起 raft 流程,并将消息广播给 Followers,等待超过半数 Members 对 entry 进行 commit

  • commit 之后的 entry 会通过 Ready 信号通知上层 server 进行 apply 操作,apply 操作会依次对 commit 的 entries 进行处理,每一个 entry 都会经历 MVCC 中的 Put 或者 Delete 完整流程

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容