参数解析
etcdctl 中的 get 操作都会转换为 range 操作,即查询一个范围
--from-key
支持 from 查询,来查询字节顺序大于等于指定 key 的那些;这个参数等价于指定 range 的结束 key 为 []byte{0}
;意为直到最后
if len(key) == 0 {
key = "\x00"
}
return func(op *Op) {
if len(op.key) == 0 {
op.key = []byte{0}
}
op.end = []byte("\x00")
op.isOptsWithFromKey = true
}
通过 get 操作如果指定的 key 为空字符串,会自动转换为 []byte{0}
;意为从最小的 key 开始
需要注意的是 []byte{0}
和 []byte("\x00")
是等价的
--prefix
支持 prefix 查询,来查询以 key 开头的那些;这个操作相当于把指定的 key 作为开始 key,它的字节顺序的下一个 key 做为结束 key,因为 range 时候是含左不含右的,所以就能找到以指定 key 开头的那些了;代码实现上,就是遍历 key 的字节数组,从后往前找一个小于 OxFF
的,将其 +1 后截断做为结束 key ;相当于就是加法计算里的进位,无法进位时就会遍历到最后;如[]byte{0xff, 0xff}
这种无法进位的就会遍历到最后,而不是遍历到[]byte{0xff, 0xff, 0x01}
这个,因为这个也只是满足prefix=[]byte{0xff, 0xff}
的其中一个查询结果,而不是终止条件,可能还包含[]byte{0xff, 0xff, 0x02}
等
func getPrefix(key []byte) []byte {
end := make([]byte, len(key))
copy(end, key)
for i := len(end) - 1; i >= 0; i-- {
if end[i] < 0xff {
end[i] = end[i] + 1
end = end[:i+1]
return end
}
}
// next prefix does not exist (e.g., 0xffff);
// default to WithFromKey policy
return noPrefixEnd
}
End Key 的处理
在 range 查询的时候,对于指定了--from-key
或者 指定了--prefix
但无法进位时,会指定结束 key 为 []byte{0}
;意为直到最后
是因为在查询的时候,会对这种 key 进行转换;对于 []byte{0}
这个 key 会直接转换为一个空数组;这里的空数组主要是为了区别未指定结束 key 的情形
func mkGteRange(rangeEnd []byte) []byte {
if len(rangeEnd) == 1 && rangeEnd[0] == 0 {
return []byte{}
}
return rangeEnd
}
如果是未指定结束 key , 那么结束 key 会是 nil 空;此时只需要查询开始 key 即可
if end == nil {
_, _, _, err := ti.unsafeGet(key, atRev)
if err != nil {
return 0
}
return 1
}
如果指定的是空数组,而不是 nil ;此时会从开始 key 开始遍历,由于 len(endi.key)
的长度为 0;所以这个方法会一直返回 true;进而一直遍历到最后
ti.tree.AscendGreaterOrEqual(keyi, func(item *keyIndex) bool {
if len(endi.key) > 0 && !item.Less(endi) {
return false
}
if !f(item) {
return false
}
return true
})
线性读
Get操作默认是线性读,但是可以通过参数--consistency=s
来启用非线性读
这里的线性读指的是仅从 Leader 读取,不从 Followers 读取,而非线性读很明显就是还可以从 Followers 读取,相应的代价就是可能读到的不是最新的数据,因为在 apply 的时候,Followers 需要等待 Leader 把最新的 commit index 推送过来之后再开始 apply 流程;而 Leader 是在 index commit 之后就会直接开始 apply 流程,也就是 Leader 的apply 流程会稍微早于 Followers 的apply流程
在线性读的时候,每次读请求都会尝试先发送一个信号到 readwait_channel 来触发线性读,然后等待 readNotifier 中的信号,即读请求需要等待;可以注意到这里的 readwait_channel 是一个缓冲为 1 的通道 ,如果多个读请求同时到来,那么只会有一个读请求能正常写入数据到通道,其它读请求都直接跳过第一个 select,直接阻塞在第二个 select 上;也就是线性读在并发时只需要触发一次就可以了,多个读请求都阻塞等待 readState_channel 中的 信号,收到信号后可以一起往后执行
func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error {
// signal linearizable loop for current notify if it hasn't been already
select {
case s.readwaitc <- struct{}{}:
default:
}
// wait for read state notification
select {
case <-readNotifier.c:
return nc.err
}
}
那么 readState_channel 是怎么激活的呢?上层 server 在启动时会注册一个协程来处理,这个协程会阻塞在 readwait_channel 上,当有读请求往这个通道写入数据后,这个协程被激活
select {
case <-s.readwaitc:
}
协程被激活后,会发送请求,请求类型是 pb.MsgReadIndex,携带的数据是随机的请求ID;即发起一次线性读请求到 raft
requestID := s.reqIDGen.Next()
n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})
然后阻塞等待请求结果,等待的是 readState_channel 中的信号
for {
select {
case rs := <-s.r.readStateC:
requestIDBytes := uint64ToBigEndianBytes(requestID)
gotOwnResponse := bytes.Equal(rs.RequestCtx, requestIDBytes)
if !gotOwnResponse {
slowReadIndex.Inc()
continue
}
return rs.Index, nil
raft 对于非 pb.MsgProp 类型的消息,会直接发送到 raft_node 的 recv_channel
if m.Type != pb.MsgProp {
select {
case n.recvc <- m:
return nil
case <-ctx.Done():
return ctx.Err()
case <-n.done:
return ErrStopped
}
}
raft_node 监听着 recv_channel ,收到消息后,直接把消息进行 raft 流程处理
case m := <-n.recvc:
r.Step(m)
Followers 处理
对于 Followers 来说,无法处理 pb.MsgReadIndex 类型的消息,会直接将消息发送给 Leader 处理;发送给 Leader 的消息会经过 Follower 的 raft 流程发送出去
case pb.MsgReadIndex:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
return nil
}
m.To = r.lead
r.send(m)
Leader 处理
单节点场景
如果 etcd 是单节点运行的,那么当前节点自己就是 Leader ,那么上述的 pb.MsgReadIndex 消息也只能当前节点处理
这时候就只需要把这个消息追加到 readStates 数组中,等待上层 server 处理即可
if req.From == None || req.From == r.id {
r.readStates = append(r.readStates, ReadState{
Index: r.raftLog.committed,
RequestCtx: req.Entries[0].Data,
})
return pb.Message{}
}
多节点场景
多节点场景下的线性读更加复杂,需要等待 Leader 同步最新的 commit index 到超过半数 Members
首先该 pb.MsgReadIndex 消息会被保存在 Leader 本地
func (ro *readOnly) addRequest(index uint64, m pb.Message) {
s := string(m.Entries[0].Data)
if _, ok := ro.pendingReadIndex[s]; ok {
return
}
ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]bool)}
ro.readIndexQueue = append(ro.readIndexQueue, s)
}
Leader 先对这条 pb.MsgReadIndex 消息进行确认
func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]bool {
rs, ok := ro.pendingReadIndex[string(context)]
if !ok {
return nil
}
rs.acks[id] = true
return rs.acks
}
然后通过 HeartBeat 广播消息给所有 Followers,同时会携带 Leader 最新的 commit index,pr.Match 对应 Follower 返回给 Leader 的已确认 Match index ;context的内容就是上面的随机请求ID
commit := min(pr.Match, r.raftLog.committed)
r.send(pb.Message{
To: to,
Type: pb.MsgHeartbeat,
Commit: commit,
Context: ctx,
})
Followers 收到 pb.MsgHeartbeat 消息后,如果 Leader 指定的 commit index 更大,会先更新自己的 commit index;并响应给 Leader
func (r *raft) handleHeartbeat(m pb.Message) {
r.raftLog.commitTo(m.Commit)
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
}
Leader 收到 pb.MsgHeartbeatResp 类型消息后,会先进行票数统计,统计的就是 Leader 之前保存在本地的 ro.pendingReadIndex 中的 acks ;这里保存了每个 Member 对这个请求ID 的确认结果;如果票数没过半,那么继续等待其它 Followers 的响应
if r.trk.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
return nil
}
等待超过半数规则达成后,Leader 会清理本地缓存 ro.pendingReadIndex 中的在当前请求ID 之前的所有读请求,因为 raft 流程都是按顺序执行的,那么读请求也都是按顺序入队的,如果不同 Follower 的多个读请求复用了一次心跳广播,那么当前请求 ID 确认后,它之前的那些读请求也都可以确认了
func (ro *readOnly) advance(m pb.Message) []*readIndexStatus {
var (
i int
found bool
)
ctx := string(m.Context)
var rss []*readIndexStatus
for _, okctx := range ro.readIndexQueue {
i++
rs, ok := ro.pendingReadIndex[okctx]
if !ok {
panic("cannot find corresponding read state from pending map")
}
rss = append(rss, rs)
if okctx == ctx {
found = true
break
}
}
if found {
ro.readIndexQueue = ro.readIndexQueue[i:]
for _, rs := range rss {
delete(ro.pendingReadIndex, string(rs.req.Entries[0].Data))
}
return rss
}
return nil
}
这些读请求,每一个都会包装为一个 pb.MsgReadIndexResp 消息发送给 Followers;所有的这些读请求会都会添加到 msgs 数组中,一起发送出去
return pb.Message{
Type: pb.MsgReadIndexResp,
To: req.From,
Index: readIndex,
Entries: req.Entries,
}
发送 pb.MsgReadIndex 消息 的 Follower 首先这个响应消息后,会把这个消息中的请求ID 和 最新的 commit index 保存到 readStates 数组中
case pb.MsgReadIndexResp:
if len(m.Entries) != 1 {
r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
return nil
}
r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
而如果 pb.MsgReadIndex 消息就是 Leader 自身发送的,即读请求直接发送给了 Leader ,Leader 也需要经历上述的一次完整的 HeartBeat 流程后,将 pb.MsgReadIndexResp 消息中的请求ID 和 最新的 commit index 保存到自己的 readStates 数组中
if req.From == None || req.From == r.id {
r.readStates = append(r.readStates, ReadState{
Index: readIndex,
RequestCtx: req.Entries[0].Data,
})
return pb.Message{}
}
上层 server
同时,Ready 信号也会检测 readStates 数组中是否存在元素,按照相同的方式通过 ready_channel 通知上层 server
上层 server 收到 ready 信号后,会将 ReadStates 数组中的最后一个元素发送到 readState_channel
if len(rd.ReadStates) != 0 {
select {
case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
case <-time.After(internalTimeout):
r.lg.Warn("timed out sending read state", zap.Duration("timeout", internalTimeout))
case <-r.stopped:
return
}
}
如上面所说,上层 server 会注册单独的协程监听在 readState_channel 上,收到消息后,会检测消息中的请求ID 和 自己的请求ID 是否一致,如果不一致,可能是收到了之前已经超时的响应,那么需要继续等待,直到收到的请求ID 匹配
for {
select {
case rs := <-s.r.readStateC:
requestIDBytes := uint64ToBigEndianBytes(requestID)
gotOwnResponse := bytes.Equal(rs.RequestCtx, requestIDBytes)
if !gotOwnResponse {
slowReadIndex.Inc()
continue
}
return rs.Index, nil
如果收到的 commit index 当前 Member 还没 apply ,那么还需要等待 apply 完成
if appliedIndex < confirmedIndex {
select {
case <-s.applyWait.Wait(confirmedIndex):
case <-s.stopping:
return
}
}
apply 完成后,才会通知 readNotifier
readNotifier.notify(nil)
这时阻塞在线性读的那些读请求才能正常收到信号,表示可以正常开始正常读了
正常读取流程就和上文 MVCC 处理 range 读请求完全一致了
线性读批量触发
在开始线性读的时候,只会给 readNotifier 加只读锁,高并发场景下,可能多个读请求会公用一个 readNotifier
s.readMu.RLock()
nc := s.readNotifier
s.readMu.RUnlock()
持有 readNotifier 之后,读请求会触发 readwait_channel ,来激活线性读协程
select {
case s.readwaitc <- struct{}{}:
default:
}
该协程收到 readwait_channel 信号后,会给 readNotifier 加独占锁;然后重新更新一个 readNotifier ;这样的效果就是,在加独占锁之前的那些读请求会共享同一个 readNotifier ;而加独占锁之后的那些读请求使用的是新的 readNotifier ,后面的这批读请求不会在本轮被触发,而是在下一轮被触发
select {
case <-s.readwaitc:
}
nextnr := newNotifier()
s.readMu.Lock()
nr := s.readNotifier
s.readNotifier = nextnr
s.readMu.Unlock()
线性读请求一致后,这里触发的是旧的 readNotifier ,共享这一个 readNotifier 的读请求被正常触发开始读;而那些持有新的 readNotifier 的读请求需要等待下一轮,重新触发线性一致性后再被触发
nr.notify(nil)
总结
ETCD 中默认的读是线性读,非线性读性能更快,但是可能读到不是最新的数据
线性读如果发送到 Leader 节点,Leader 节点也需要通过 HeartBeat 广播一次最新的 commit index 给所有 Followers,等待超过半数 Followers 对 HeartBeat 响应后,才会允许线性读
线性读如果发送到 Follower 节点,Follower 节点需要先向 Leader 申请,即把请求转发给 Leader;然后由 Leader 先通过 HeartBeat 广播一次最新的 commit index 给所有 Followers,等待超过半数 Followers 对 HeartBeat 响应后,再响应给对应 Follower 可以开始线性读
高并发的线性读也只会触发一次 HeartBeat 广播,所有的读请求都等待这一轮的 HeartBeat 广播完成后,接收通知后一起开始正常读请求;这样可以避免过多的 HeartBeat 请求拖慢 raft 流程