发送消息流程
流程简介
上图是raft中发送消息的流程。大致分为以下的过程。
- 由raftexample/raft.go部分发起。前文提到过该部分主要是为应用层提供服务。
- 经由raft/node.go部分转发到共识模块。前文提到过该部分主要起到应用层和共识模块的衔接作用。
- 共识模块raft/raft.go,对消息进行处理后,将消息append到msgs。注意共识模块自身不管是如何将消息通过通讯传输到其他节点的。这边只是放入到msgs中。
- raft/node.go部分监测到msgs中有内容了。使用newReady生成一个一个新的ready对象,并发送给readyc通道。
- raftexample/raft.go中,监测到readyc通道有内容之后,会进行写wal日志,增加Storage日志条目,判断是否需要打快照等动作。最后通过transoport组件,经http协议将消息发送给对应的节点。
代码详解
- 消息的发起部分。raft中有3类RPC,分别为AppendEntries RPCs,RequestVote RPCs,以及InstallSnapshot RPCs。通过发起的形式分成以下两种。
1.1 raft自行触发的比方说leader通过AppendEntries发送心跳包,以及RequestVote RPC等。图中最左部分的tickerC是发起者,是通过设置时钟来完成的。
// github.com/coreos/etcd/contrib/raftexample/raft.go
func (rc *raftNode) serveChannels() {
/*其他逻辑*/
for {
select {
case <-ticker.C:
rc.node.Tick()
/*其他情况*/
}
}
}
1.2 首先由客户端发送请求给leader,然后leader向其他节点发送AppendEntires来进行日志备份。图中最左部分的proposeC即是http server接受到客户端请求后,将消息通过proposeC传递到raftNode中。
// github.com/coreos/etcd/contrib/raftexample/raft.go
func (rc *raftNode) serveChannels() {
/*其他逻辑*/
// send proposals over raft
go func() {
var confChangeCount uint64 = 0
for rc.proposeC != nil && rc.confChangeC != nil {
select {
case prop, ok := <-rc.proposeC:
if !ok {
rc.proposeC = nil
} else {
// blocks until accepted by raft state machine
rc.node.Propose(context.TODO(), []byte(prop))
}
case cc, ok := <-rc.confChangeC:
if !ok {
rc.confChangeC = nil
} else {
confChangeCount += 1
cc.ID = confChangeCount
rc.node.ProposeConfChange(context.TODO(), cc)
}
}
}
// client closed channel; shutdown raft if not already
close(rc.stopc)
}()
}
- 经由node.go转发消息。
2.1 tick消息,调用node的Tick方法。方法中为向tickc发送消息。后由run中逻辑处理。见2.3
//github.com/coreos/raft/node.go
// Tick increments the internal logical clock for this Node. Election timeouts
// and heartbeat timeouts are in units of ticks.
func (n *node) Tick() {
select {
case n.tickc <- struct{}{}:
case <-n.done:
default:
n.logger.Warningf("A tick missed to fire. Node blocks too long!")
}
}
2.2 propose消息,调用node的Propose方法。方法中调用node的step对消息进行判断,因为这边消息类型是MsgProp,所以是将消息m发送给propc通道。后由run中逻辑处理。见2.3
//github.com/coreos/raft/node.go
func (n *node) Propose(ctx context.Context, data []byte) error {
return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}
// Step advances the state machine using msgs. The ctx.Err() will be returned,
// if any.
func (n *node) step(ctx context.Context, m pb.Message) error {
ch := n.recvc
if m.Type == pb.MsgProp {
ch = n.propc
}
select {
case ch <- m:
return nil
case <-ctx.Done():
return ctx.Err()
case <-n.done:
return ErrStopped
}
}
2.3 run中处理各种消息。收到tickc时调用raft的tick方法。在收到propc消息时,将m的发送者标记为raft的id,然后调用raft的Step方法。
//github.com/coreos/raft/node.go
func (n *node) run(r *raft) {
/*其他逻辑暂时忽略*/
for {
/*其他逻辑暂时忽略,下文接收消息部分会再讨论*/
select {
case m := <-propc:
m.From = r.id
r.Step(m)
case <-n.tickc:
r.tick()
/*其他情况暂时忽略*/
}
}
}
- 共识模块raft中处理消息。
3.1 对于tick的情况根据当前节点的状态,有两个tick方法。leader调用的是tickHeartbeat,follower和candidate调用的是tickElection。两个方法都是达到相应的配置的时间的时候调用Step方法进行相应的处理。
//github.com/coreos/etcd/raft/raft.go
// tickElection is run by followers and candidates after r.electionTimeout.
func (r *raft) tickElection() {
r.electionElapsed++
if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
}
// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
func (r *raft) tickHeartbeat() {
r.heartbeatElapsed++
r.electionElapsed++
if r.electionElapsed >= r.electionTimeout {
r.electionElapsed = 0
if r.checkQuorum {
r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
}
// If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
if r.state == StateLeader && r.leadTransferee != None {
r.abortLeaderTransfer()
}
}
if r.state != StateLeader {
return
}
if r.heartbeatElapsed >= r.heartbeatTimeout {
r.heartbeatElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
}
}
3.2 Step方法。共识模块中最重要的方法,根据消息的类型和自身的状态进行响应。该部分逻辑留作下文分析。不过可以观察到Step方法在判断出该消息是需要共识给其他节点的时候,会通过调用sendAppend、sendHeartbeat、bcastAppend、bcastHeartbeat、bcastHeartbeatWithCtx几个方法,或者是直接调用send方法。
3.3 send方法,send方法结合term和msg的type对消息进行处理。然后append到msgs中。注意这里只进行了append,而不是真正的发送。
func (r *raft) send(m pb.Message) {
m.From = r.id
if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp {
if m.Term == 0 {
panic(fmt.Sprintf("term should be set when sending %s", m.Type))
}
} else {
if m.Term != 0 {
panic(fmt.Sprintf("term should not be set when sending %s (was %d)", m.Type, m.Term))
}
if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex {
m.Term = r.Term
}
}
r.msgs = append(r.msgs, m)
}
- node部分中run方法中调用newReady方法,将新的消息,一些未写入到文件的日志条目等放入生成ready对象rd,如果有更新,则会走select中的case readyc<-rd,然后将各个消息重置为nil。
// github.com/coreos/raft/node.go
func (n *node) run(r *raft) {
for {
if advancec != nil {
readyc = nil
} else {
rd = newReady(r, prevSoftSt, prevHardSt)
if rd.containsUpdates() {
readyc = n.readyc
} else {
readyc = nil
}
}
/*其他逻辑*/
select {
/*其他情况*/
case readyc <- rd:
if rd.SoftState != nil {
prevSoftSt = rd.SoftState
}
if len(rd.Entries) > 0 {
prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
havePrevLastUnstablei = true
}
if !IsEmptyHardState(rd.HardState) {
prevHardSt = rd.HardState
}
if !IsEmptySnap(rd.Snapshot) {
prevSnapi = rd.Snapshot.Metadata.Index
}
r.msgs = nil
r.readStates = nil
advancec = n.advancec
}
}
}
将msgs等信息放入到Ready中
// github.com/coreos/raft/node.go
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
rd := Ready{
Entries: r.raftLog.unstableEntries(),
CommittedEntries: r.raftLog.nextEnts(),
Messages: r.msgs,
}
if softSt := r.softState(); !softSt.equal(prevSoftSt) {
rd.SoftState = softSt
}
if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
rd.HardState = hardSt
}
if r.raftLog.unstable.snapshot != nil {
rd.Snapshot = *r.raftLog.unstable.snapshot
}
if len(r.readStates) != 0 {
rd.ReadStates = r.readStates
}
rd.MustSync = MustSync(rd.HardState, prevHardSt, len(rd.Entries))
return rd
}
判断Ready中是否有新的信息
//github.com/coreos/raft/node.go
func (rd Ready) containsUpdates() bool {
return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) ||
!IsEmptySnap(rd.Snapshot) || len(rd.Entries) > 0 ||
len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || len(rd.ReadStates) != 0
}
- 有新的消息时,node那边会将消息发送到readyc通道。raftexample中raftNode的serverChannels会收到Readc并进行处理。将日志条目写入到wal文件和memory storage中,如果有快照保存快照并提交给state machine去执行。通过transport将消息发送给其他节点。如果有的可提交的日志条目也提交给state machine去执行。判断是否需要触发snapshot,最后发送advance给node。
经过该步之后,日志条目才写入到wal文件和memorty storage。消息也才经Transport发送给其他节点。
// github.com/coreos/etcd/contrib/raftexample/raft.go
func (rc *raftNode) serveChannels() {
/*其他逻辑*/
for {
select {
// store raft entries to wal, then publish over commit channel
//node那边发送readyc后,这边收到消息
case rd := <-rc.node.Ready():
//将日志条目写入到wal文件
rc.wal.Save(rd.HardState, rd.Entries)
//如果snap不空,则保存快照文件,并提交
if !raft.IsEmptySnap(rd.Snapshot) {
rc.saveSnap(rd.Snapshot)
rc.raftStorage.ApplySnapshot(rd.Snapshot)
rc.publishSnapshot(rd.Snapshot)
}
//将日志条目写入到storage,storage是memorystorage
rc.raftStorage.Append(rd.Entries)
//将消息通过Transport发送给其他节点。
rc.transport.Send(rd.Messages)
//如果日志条目中有新的可提交日志,则提交到state machine那边执行
if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
rc.stop()
return
}
rc.maybeTriggerSnapshot()
//发送advance给node
rc.node.Advance()
/*其他情况*/
}
}
}
node节点收到advance消息,即确定了之前的消息已写入wal文件和storage等。然后更新raftLog信息。并将advancec赋值为空,即保证下一次新消息的执行。
//github.com/coreos/raft/node.go
func (n *node) run(r *raft) {
for {
/*其他逻辑*/
select {
/*其他情况*/
case <-advancec:
//更新raft中日志的信息
if prevHardSt.Commit != 0 {
r.raftLog.appliedTo(prevHardSt.Commit)
}
if havePrevLastUnstablei {
r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
havePrevLastUnstablei = false
}
r.raftLog.stableSnapTo(prevSnapi)
advancec = nil
}
}
}
接收消息流程
流程简介
- 由transport模块流入,收到其他节点传递过来的消息。
- 通过raftexample中的raftNode的Process方法,调用node来处理消息。
- node部分对消息进行简单的判断和处理后,将消息转由共识模块处理。
- 共识模块处理消息。
代码详解
- transport模块接收到消息。调用raftNode中的Process方法。
// github.com/coreos/etcd/rafthttp/peer.go
func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats.FollowerStats) *peer {
/*其他逻辑*/
go func() {
for {
select {
//接收到消息后,调用Process方法
case mm := <-p.recvc:
if err := r.Process(ctx, mm); err != nil {
plog.Warningf("failed to process raft message (%v)", err)
}
case <-p.stopc:
return
}
}
}()
}
2.经由raftNode的Process调用node的Step。
// github.com/coreos/etcd/contrib/raftexample/raft.go
func (rc *raftNode) Process(ctx context.Context, m raftpb.Message) error {
return rc.node.Step(ctx, m)
}
- Step判断是否为自身的消息,如果不是则通过step处理。step中消息不是MsgProp类型,所以是将消息m发送给recvc通道。后由run 中逻辑处理。
// github.com/coreos/raft/node.go
func (n *node) Step(ctx context.Context, m pb.Message) error {
// ignore unexpected local messages receiving over network
if IsLocalMsg(m.Type) {
// TODO: return an error?
return nil
}
return n.step(ctx, m)
}
// github.com/coreos/raft/node.go
func (n *node) step(ctx context.Context, m pb.Message) error {
ch := n.recvc
if m.Type == pb.MsgProp {
ch = n.propc
}
select {
case ch <- m:
return nil
case <-ctx.Done():
return ctx.Err()
case <-n.done:
return ErrStopped
}
}
3.1 run中将收到的消息发送给共识模块的Step方法。
// github.com/coreos/raft/node.go
func (n *node) run(r *raft) {
for {
/*其他逻辑*/
select {
/*其他情况*/
case m := <-n.recvc:
// filter out response message from unknown From.
if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
r.Step(m) // raft never returns an error
}
}
}
}
- 共识模块中的Step根据消息类型和信息以及自身状态进行相应的操作。