ETCD《七》--Get操作

参数解析

etcdctl 中的 get 操作都会转换为 range 操作,即查询一个范围

--from-key 支持 from 查询,来查询字节顺序大于等于指定 key 的那些;这个参数等价于指定 range 的结束 key 为 []byte{0};意为直到最后

if len(key) == 0 {
            key = "\x00"
        }
return func(op *Op) {
        if len(op.key) == 0 {
            op.key = []byte{0}
        }
        op.end = []byte("\x00")
        op.isOptsWithFromKey = true
    }

通过 get 操作如果指定的 key 为空字符串,会自动转换为 []byte{0};意为从最小的 key 开始

需要注意的是 []byte{0}[]byte("\x00") 是等价的

--prefix 支持 prefix 查询,来查询以 key 开头的那些;这个操作相当于把指定的 key 作为开始 key,它的字节顺序的下一个 key 做为结束 key,因为 range 时候是含左不含右的,所以就能找到以指定 key 开头的那些了;代码实现上,就是遍历 key 的字节数组,从后往前找一个小于 OxFF的,将其 +1 后截断做为结束 key ;相当于就是加法计算里的进位,无法进位时就会遍历到最后;如[]byte{0xff, 0xff}这种无法进位的就会遍历到最后,而不是遍历到[]byte{0xff, 0xff, 0x01}这个,因为这个也只是满足prefix=[]byte{0xff, 0xff}的其中一个查询结果,而不是终止条件,可能还包含[]byte{0xff, 0xff, 0x02}

func getPrefix(key []byte) []byte {
    end := make([]byte, len(key))
    copy(end, key)
    for i := len(end) - 1; i >= 0; i-- {
        if end[i] < 0xff {
            end[i] = end[i] + 1
            end = end[:i+1]
            return end
        }
    }
    // next prefix does not exist (e.g., 0xffff);
    // default to WithFromKey policy
    return noPrefixEnd
}

End Key 的处理

在 range 查询的时候,对于指定了--from-key 或者 指定了--prefix 但无法进位时,会指定结束 key 为 []byte{0};意为直到最后

是因为在查询的时候,会对这种 key 进行转换;对于 []byte{0} 这个 key 会直接转换为一个空数组;这里的空数组主要是为了区别未指定结束 key 的情形

func mkGteRange(rangeEnd []byte) []byte {
    if len(rangeEnd) == 1 && rangeEnd[0] == 0 {
        return []byte{}
    }
    return rangeEnd
}

如果是未指定结束 key , 那么结束 key 会是 nil 空;此时只需要查询开始 key 即可

if end == nil {
        _, _, _, err := ti.unsafeGet(key, atRev)
        if err != nil {
            return 0
        }
        return 1
    }

如果指定的是空数组,而不是 nil ;此时会从开始 key 开始遍历,由于 len(endi.key) 的长度为 0;所以这个方法会一直返回 true;进而一直遍历到最后

ti.tree.AscendGreaterOrEqual(keyi, func(item *keyIndex) bool {
        if len(endi.key) > 0 && !item.Less(endi) {
            return false
        }
        if !f(item) {
            return false
        }
        return true
    })

线性读

Get操作默认是线性读,但是可以通过参数--consistency=s 来启用非线性读

这里的线性读指的是仅从 Leader 读取,不从 Followers 读取,而非线性读很明显就是还可以从 Followers 读取,相应的代价就是可能读到的不是最新的数据,因为在 apply 的时候,Followers 需要等待 Leader 把最新的 commit index 推送过来之后再开始 apply 流程;而 Leader 是在 index commit 之后就会直接开始 apply 流程,也就是 Leader 的apply 流程会稍微早于 Followers 的apply流程

在线性读的时候,每次读请求都会尝试先发送一个信号到 readwait_channel 来触发线性读,然后等待 readNotifier 中的信号,即读请求需要等待;可以注意到这里的 readwait_channel 是一个缓冲为 1 的通道 ,如果多个读请求同时到来,那么只会有一个读请求能正常写入数据到通道,其它读请求都直接跳过第一个 select,直接阻塞在第二个 select 上;也就是线性读在并发时只需要触发一次就可以了,多个读请求都阻塞等待 readState_channel 中的 信号,收到信号后可以一起往后执行

func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error {
    
    // signal linearizable loop for current notify if it hasn't been already
    select {
    case s.readwaitc <- struct{}{}:
    default:
    }

    // wait for read state notification
    select {
    case <-readNotifier.c:
        return nc.err
    }
}

那么 readState_channel 是怎么激活的呢?上层 server 在启动时会注册一个协程来处理,这个协程会阻塞在 readwait_channel 上,当有读请求往这个通道写入数据后,这个协程被激活

        select {
        case <-s.readwaitc:
        }

协程被激活后,会发送请求,请求类型是 pb.MsgReadIndex,携带的数据是随机的请求ID;即发起一次线性读请求到 raft

requestID := s.reqIDGen.Next()
n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})

然后阻塞等待请求结果,等待的是 readState_channel 中的信号

for {
        select {
        case rs := <-s.r.readStateC:
            requestIDBytes := uint64ToBigEndianBytes(requestID)
            gotOwnResponse := bytes.Equal(rs.RequestCtx, requestIDBytes)
            if !gotOwnResponse {
                
                slowReadIndex.Inc()
                continue
            }
            return rs.Index, nil

raft 对于非 pb.MsgProp 类型的消息,会直接发送到 raft_node 的 recv_channel

if m.Type != pb.MsgProp {
        select {
        case n.recvc <- m:
            return nil
        case <-ctx.Done():
            return ctx.Err()
        case <-n.done:
            return ErrStopped
        }
    }

raft_node 监听着 recv_channel ,收到消息后,直接把消息进行 raft 流程处理

case m := <-n.recvc:
            
            r.Step(m)

Followers 处理

对于 Followers 来说,无法处理 pb.MsgReadIndex 类型的消息,会直接将消息发送给 Leader 处理;发送给 Leader 的消息会经过 Follower 的 raft 流程发送出去

case pb.MsgReadIndex:
        if r.lead == None {
            r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
            return nil
        }
        m.To = r.lead
        r.send(m)

Leader 处理

单节点场景

如果 etcd 是单节点运行的,那么当前节点自己就是 Leader ,那么上述的 pb.MsgReadIndex 消息也只能当前节点处理

这时候就只需要把这个消息追加到 readStates 数组中,等待上层 server 处理即可

if req.From == None || req.From == r.id {
        r.readStates = append(r.readStates, ReadState{
            Index:      r.raftLog.committed,
            RequestCtx: req.Entries[0].Data,
        })
        return pb.Message{}
    }

多节点场景

多节点场景下的线性读更加复杂,需要等待 Leader 同步最新的 commit index 到超过半数 Members

首先该 pb.MsgReadIndex 消息会被保存在 Leader 本地

func (ro *readOnly) addRequest(index uint64, m pb.Message) {
    s := string(m.Entries[0].Data)
    if _, ok := ro.pendingReadIndex[s]; ok {
        return
    }
    ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]bool)}
    ro.readIndexQueue = append(ro.readIndexQueue, s)
}

Leader 先对这条 pb.MsgReadIndex 消息进行确认

func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]bool {
    rs, ok := ro.pendingReadIndex[string(context)]
    if !ok {
        return nil
    }

    rs.acks[id] = true
    return rs.acks
}

然后通过 HeartBeat 广播消息给所有 Followers,同时会携带 Leader 最新的 commit index,pr.Match 对应 Follower 返回给 Leader 的已确认 Match index ;context的内容就是上面的随机请求ID

commit := min(pr.Match, r.raftLog.committed)
r.send(pb.Message{
        To:      to,
        Type:    pb.MsgHeartbeat,
        Commit:  commit,
        Context: ctx,
    })

Followers 收到 pb.MsgHeartbeat 消息后,如果 Leader 指定的 commit index 更大,会先更新自己的 commit index;并响应给 Leader

func (r *raft) handleHeartbeat(m pb.Message) {
    r.raftLog.commitTo(m.Commit)
    r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
}

Leader 收到 pb.MsgHeartbeatResp 类型消息后,会先进行票数统计,统计的就是 Leader 之前保存在本地的 ro.pendingReadIndex 中的 acks ;这里保存了每个 Member 对这个请求ID 的确认结果;如果票数没过半,那么继续等待其它 Followers 的响应

if r.trk.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
            return nil
        }

等待超过半数规则达成后,Leader 会清理本地缓存 ro.pendingReadIndex 中的在当前请求ID 之前的所有读请求,因为 raft 流程都是按顺序执行的,那么读请求也都是按顺序入队的,如果不同 Follower 的多个读请求复用了一次心跳广播,那么当前请求 ID 确认后,它之前的那些读请求也都可以确认了

func (ro *readOnly) advance(m pb.Message) []*readIndexStatus {
    var (
        i     int
        found bool
    )

    ctx := string(m.Context)
    var rss []*readIndexStatus

    for _, okctx := range ro.readIndexQueue {
        i++
        rs, ok := ro.pendingReadIndex[okctx]
        if !ok {
            panic("cannot find corresponding read state from pending map")
        }
        rss = append(rss, rs)
        if okctx == ctx {
            found = true
            break
        }
    }

    if found {
        ro.readIndexQueue = ro.readIndexQueue[i:]
        for _, rs := range rss {
            delete(ro.pendingReadIndex, string(rs.req.Entries[0].Data))
        }
        return rss
    }

    return nil
}

这些读请求,每一个都会包装为一个 pb.MsgReadIndexResp 消息发送给 Followers;所有的这些读请求会都会添加到 msgs 数组中,一起发送出去

return pb.Message{
        Type:    pb.MsgReadIndexResp,
        To:      req.From,
        Index:   readIndex,
        Entries: req.Entries,
    }

发送 pb.MsgReadIndex 消息 的 Follower 首先这个响应消息后,会把这个消息中的请求ID 和 最新的 commit index 保存到 readStates 数组中

case pb.MsgReadIndexResp:
        if len(m.Entries) != 1 {
            r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
            return nil
        }
        r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})

而如果 pb.MsgReadIndex 消息就是 Leader 自身发送的,即读请求直接发送给了 Leader ,Leader 也需要经历上述的一次完整的 HeartBeat 流程后,将 pb.MsgReadIndexResp 消息中的请求ID 和 最新的 commit index 保存到自己的 readStates 数组中

if req.From == None || req.From == r.id {
        r.readStates = append(r.readStates, ReadState{
            Index:      readIndex,
            RequestCtx: req.Entries[0].Data,
        })
        return pb.Message{}
    }

上层 server

同时,Ready 信号也会检测 readStates 数组中是否存在元素,按照相同的方式通过 ready_channel 通知上层 server

上层 server 收到 ready 信号后,会将 ReadStates 数组中的最后一个元素发送到 readState_channel

if len(rd.ReadStates) != 0 {
                    select {
                    case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
                    case <-time.After(internalTimeout):
                        r.lg.Warn("timed out sending read state", zap.Duration("timeout", internalTimeout))
                    case <-r.stopped:
                        return
                    }
                }

如上面所说,上层 server 会注册单独的协程监听在 readState_channel 上,收到消息后,会检测消息中的请求ID 和 自己的请求ID 是否一致,如果不一致,可能是收到了之前已经超时的响应,那么需要继续等待,直到收到的请求ID 匹配

for {
        select {
        case rs := <-s.r.readStateC:
            requestIDBytes := uint64ToBigEndianBytes(requestID)
            gotOwnResponse := bytes.Equal(rs.RequestCtx, requestIDBytes)
            if !gotOwnResponse {
                
                slowReadIndex.Inc()
                continue
            }
            return rs.Index, nil

如果收到的 commit index 当前 Member 还没 apply ,那么还需要等待 apply 完成

if appliedIndex < confirmedIndex {
            select {
            case <-s.applyWait.Wait(confirmedIndex):
            case <-s.stopping:
                return
            }
        }

apply 完成后,才会通知 readNotifier

readNotifier.notify(nil)

这时阻塞在线性读的那些读请求才能正常收到信号,表示可以正常开始正常读了

正常读取流程就和上文 MVCC 处理 range 读请求完全一致了

线性读批量触发

在开始线性读的时候,只会给 readNotifier 加只读锁,高并发场景下,可能多个读请求会公用一个 readNotifier

s.readMu.RLock()
    nc := s.readNotifier
    s.readMu.RUnlock()

持有 readNotifier 之后,读请求会触发 readwait_channel ,来激活线性读协程

select {
    case s.readwaitc <- struct{}{}:
    default:
    }

该协程收到 readwait_channel 信号后,会给 readNotifier 加独占锁;然后重新更新一个 readNotifier ;这样的效果就是,在加独占锁之前的那些读请求会共享同一个 readNotifier ;而加独占锁之后的那些读请求使用的是新的 readNotifier ,后面的这批读请求不会在本轮被触发,而是在下一轮被触发

select {    
        case <-s.readwaitc:
        }

        nextnr := newNotifier()
        s.readMu.Lock()
        nr := s.readNotifier
        s.readNotifier = nextnr
        s.readMu.Unlock()

线性读请求一致后,这里触发的是旧的 readNotifier ,共享这一个 readNotifier 的读请求被正常触发开始读;而那些持有新的 readNotifier 的读请求需要等待下一轮,重新触发线性一致性后再被触发

nr.notify(nil)

总结

  • ETCD 中默认的读是线性读,非线性读性能更快,但是可能读到不是最新的数据

  • 线性读如果发送到 Leader 节点,Leader 节点也需要通过 HeartBeat 广播一次最新的 commit index 给所有 Followers,等待超过半数 Followers 对 HeartBeat 响应后,才会允许线性读

  • 线性读如果发送到 Follower 节点,Follower 节点需要先向 Leader 申请,即把请求转发给 Leader;然后由 Leader 先通过 HeartBeat 广播一次最新的 commit index 给所有 Followers,等待超过半数 Followers 对 HeartBeat 响应后,再响应给对应 Follower 可以开始线性读

  • 高并发的线性读也只会触发一次 HeartBeat 广播,所有的读请求都等待这一轮的 HeartBeat 广播完成后,接收通知后一起开始正常读请求;这样可以避免过多的 HeartBeat 请求拖慢 raft 流程

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容