Peer监听
Peer 监听通常监听在 2380 端口上,用于提供 Peer 之间的 https 服务
注册的一些服务端点:
/raft:raftHandler处理
/members:peerMembersHandler处理
/members/promote/:peerMemberPromoteHandler处理
/leases、/leases/internal:leaseHandler处理
/downgrade/enabled:downgradeEnabledHandler处理
/members/hashkv:hashKVHandler处理
/version:versionHandler处理
Raft Handler
Raft Handler有可以进一步划分为
/raft:pipelineHandler处理
/raft/stream/:streamHandler处理
/raft/snapshot:snapHandler处理
/raft/probing:httpHealth处理
raft handler在初始化的时候会创建两类 Transport,一种是pipeline transport, 另一种是 stream transport
func (t *Transport) Start() error {
var err error
t.streamRt, err = newStreamRoundTripper(t.TLSInfo, t.DialTimeout)
t.pipelineRt, err = NewRoundTripper(t.TLSInfo, t.DialTimeout)
return nil
}
stream transport:长连接不断开,Peer 之间相互推送的方式
pipeline transport:短链接,请求一次,响应一次;发送 snapshot 或者 stream transport 不可用时才会使用这个;snapshot 也使用这个主要考虑是 snapshot 较大 ,避免阻塞其他消息
func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) {
var ok bool
if isMsgSnap(m) {
return p.pipeline.msgc, pipelineMsg
} else if writec, ok = p.writer.writec(); ok {
return writec, streamMsg
}
return p.pipeline.msgc, pipelineMsg
}
Stream Handler
每个 Member 启动的时候都会相互添加 Peer ,每个 Peer 都会维护一个写通道、一个读通道
读协程
读协程的初始化
p.msgAppReader = &streamReader{
lg: t.Logger,
peerID: peerID,
typ: "message",
tr: t,
picker: picker,
status: status,
recvc: p.recvc,
propc: p.propc,
rl: rate.NewLimiter(t.DialRetryFrequency, 1),
}
读通道启动的时候就会尝试 dial 对应的 Peer 地址
rc, err := cr.dial(t)
dial 实际上就是向 Peer 发送请求 GET /raft/stream/message/local_member_id
,使用的就是 stream transport
uu.Path = path.Join(t.endpoint(cr.lg), cr.tr.ID.String())
resp, err := cr.tr.streamRt.RoundTrip(req)
Peer 端已经注册了 /raft/stream/ 开头的请求由 streamHandler 处理,这个处理器会先返回200状态码给发送方
w.WriteHeader(http.StatusOK)
w.(http.Flusher).Flush()
然后将当前连接发送给写通道,当前的 Writer 就是 ResponseWriter , Flusher 用于将写入到 Response 缓冲区的消息强制推送给对方
c := newCloseNotifier()
conn := &outgoingConn{
t: t,
Writer: w,
Flusher: w.(http.Flusher),
Closer: c,
localID: h.tr.ID,
peerID: from,
}
select {
case streamWWriter.connc <- conn:
return true
case <-cw.done:
return false
}
最后将连接保持不断开,连接两端的消息依靠 Flusher 来推送
<-c.closeNotify()
而发送方接收到对方推送过来的 200 状态码后,会不断尝试从 Response 中读取数据,读到数据后就进行反序列化,再将反序列化后的消息发送到 recv_channel 或者 prop_channel
switch resp.StatusCode {
case http.StatusOK:
return resp.Body, nil
for {
m, err := dec.decode()
recvc := cr.recvc
if m.Type == raftpb.MsgProp {
recvc = cr.propc
}
select {
case recvc <- m:
而这两个 channel 都有专门的协程监听,而这里的 Process 就是通过 step 方法将消息流入到 raft 流程里
go func() {
for {
select {
case mm := <-p.recvc:
if err := r.Process(ctx, mm); err != nil {
}
case <-p.stopc:
return
}
}
}()
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
return s.r.Step(ctx, m)
}
写协程
写协程的初始化
w := &streamWriter{
lg: lg,
localID: local,
peerID: id,
status: status,
fs: fs,
r: r,
msgc: make(chan raftpb.Message, 4096),
connc: make(chan *outgoingConn),
stopc: make(chan struct{}),
done: make(chan struct{}),
}
写通道在启动的时候会监听在conn_channel 上,而读通道在在启动的时候会将连接发送到这个通道,这里写通道就能正常收到这个连接,并完成一些初始化功能,比如将 working 设置为true,意味着 stream transport 是正常工作的,那么在 Peer 之间相互发送消息的时候就会优先选择 stream transport,而不会退化到 pipeline transport;同时再初始化 msg_channel,意味着可以开始接收消息了
case conn := <-cw.connc:
cw.working = true
heartbeatc, msgc = tickc.C, cw.msgc
在 Peer 之间相互发送消息的时候,首先会挑选一个写入通道,正如前面所说,发送 snapshot 或者 stream transport 不可用时才会使用 pipeline transport,其它消息或者 stream transport 正常初始化时都会使用 stream 的写入通道
func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) {
var ok bool
if isMsgSnap(m) {
return p.pipeline.msgc, pipelineMsg
} else if writec, ok = p.writer.writec(); ok {
return writec, streamMsg
}
return p.pipeline.msgc, pipelineMsg
}
发送时直接向消息写入到 pick 到的写通道里
writec, name := p.pick(m)
select {
case writec <- m:
而写协程同时也监听着 msg_channel ,对于需要发送给 Peer 的消息会先编码,再选择合适的时机推送给 Peer ,如没有更多消息 或者 消息累计到了 缓冲区的一半,就会通过 Flusher 将 Response 缓冲区的消息强制推送给对方
case m := <-msgc:
err := enc.encode(&m)
if err == nil {
unflushed += m.Size()
if len(msgc) == 0 || batched > 4096 / 2 {
flusher.Flush()
sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
unflushed = 0
batched = 0
} else {
batched++
}
continue
}
Pipeline Handler
相比较之下,pipeline handler 的工作原理就简单很多了;按照 pick 的规则,发送 snapshot 或者 stream transport 不可用时才会使用 pipeline transport
此时需要发送给 Peer 的消息,就会 pick 到这个 pipeline 的写通道,而 pipeline 在启动时会监听在这个写通道上,收到消息后就构建 HTTP 请求发送给 Peer
func (p *pipeline) handle() {
defer p.wg.Done()
for {
select {
case m := <-p.msgc:
start := time.Now()
err := p.post(pbutil.MustMarshal(&m))
这里构建的是 HTTP POST请求,请求的就是 POST /raft
路径,使用的就是 pipeline transport
req, err := http.NewRequest(http.MethodPost, "/raft", body)
req.Header.Set("Content-Type", "application/protobuf")
resp, err := p.tr.pipelineRt.RoundTrip(req)
正如前面所说,POST /raft
路径的请求会交给 pipelineHandler处理,pipelineHandler 的处理也很简单,直接通过 Process 调用 step 方法将消息发给 raft 流程处理
func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if err := h.r.Process(context.TODO(), m); err != nil {
小结
每个 Member 都会 维护到其它 Peers 的两个连接,一种是pipeline transport, 另一种是 stream transport;发送 snapshot 或者 stream transport 不可用时才会使用 pipeline transport;否则 raft 相关的消息都会通过 stream transport 发送
-
每个 Member 到每个 Peer 都会维护两个协程:
一个读协程,会不断从 stream transport 中读取数据,然后发送到 recv_channel 或者 prop_channel,而这两个通道都会通过 step 方法将消息发送给 raft 流程处理
一个写协程,需要发送给 Peer 的消息,首先会 pick 一个写通道,pick 的原则就是 发送 snapshot 或者 stream transport 不可用时才会使用 pipeline transport,否则都会 pick 到 stream transport;然后将消息写入到这个写通道,而这个写通道对应的就是 Peer 连接中的 Response,负责将消息推送给 Peer
Peer 的读协程就负责从 Response 中读取数据,再发送给自己的 raft 流程