EtcdRaft源码分析(提交数据)

下面我们来看下外部提交数据,Raft是怎么处理的。

Client

Note接口

type Node interface {
    ...
   // Propose proposes that data be appended to the log. Note that proposals can be lost without
   // notice, therefore it is user's job to ensure proposal retries.
   Propose(ctx context.Context, data []byte) error
   ...
}

外部Client提交数据就是调用Propose方法。

Follower

case pb.MsgProp:
        if r.lead == None {
            return ErrProposalDropped
        } else if r.disableProposalForwarding {
            return ErrProposalDropped
        }
        m.To = r.lead
        r.send(m)

Follower接到Propose请求,可能的话会转发给Leader。

Candidate

case pb.MsgProp:
   r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
   return ErrProposalDropped

如果是Candidate收到的话,直接丢弃

Leader

case pb.MsgProp:
   if len(m.Entries) == 0 {
      r.logger.Panicf("%x stepped empty MsgProp", r.id)
   }
   if _, ok := r.prs[r.id]; !ok {
      // If we are not currently a member of the range (i.e. this node
      // was removed from the configuration while serving as leader),
      // drop any new proposals.
      return ErrProposalDropped
   }
   if r.leadTransferee != None {
      r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
      return ErrProposalDropped
   }

   for i, e := range m.Entries {
      if e.Type == pb.EntryConfChange {
         if r.pendingConfIndex > r.raftLog.applied {
            r.logger.Infof("propose conf %s ignored since pending unapplied configuration [index %d, applied %d]",
               e.String(), r.pendingConfIndex, r.raftLog.applied)
            m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
         } else {
            r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
         }
      }
   }

   if !r.appendEntry(m.Entries...) {
      return ErrProposalDropped
   }
   r.bcastAppend()
   return nil
  • 当然了,你不能玩假的,数据不能为空
  • 看成员列表有没有自己,如果没有的话,丢弃。配置变更会把自己在ptr里面删掉么?待分析
  • 权力转移期间,也会丢弃
  • 遍历entries,如果发现里面配置更新的提案,且r.pendingConfIndex > r.raftLog.applied,那将这条新的提案置空,否则将这条配置更新的位置记到pendingConfIndex

appendEntry

func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
   li := r.raftLog.lastIndex()
   for i := range es {
      es[i].Term = r.Term
      es[i].Index = li + 1 + uint64(i)
   }
   // Track the size of this uncommitted proposal.
   if !r.increaseUncommittedSize(es) {
      r.logger.Debugf(
         "%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal",
         r.id,
      )
      // Drop the proposal.
      return false
   }
   // use latest "last" index after truncate/append
   li = r.raftLog.append(es...)
   r.getProgress(r.id).maybeUpdate(li)
   // Regardless of maybeCommit's return, our caller will call bcastAppend.
   r.maybeCommit()
   return true
}
  • 这些数据进来首先第一件事情要给他们分配Leader现在最新的任期,以及index(根据lastindex)
  • 如果这次提交得数据让uncommittedSize累加起来的size超过阈值,那么这次数据提交也得丢弃
  • append到本地unstable,返回现在最新得lastindex
  • 根据lastindex,maybeUpdate,就是将自己的进度更新,Match=lastindex,Next=lastindex+1,updated=true
  • maybeCommit, 会去收集成员的进度,然后伺机提交,不过刚开始,这里不会有效果

bcastAppend

func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
   pr := r.getProgress(to)
   if pr.IsPaused() {
      return false
   }
   m := pb.Message{}
   m.To = to

   term, errt := r.raftLog.term(pr.Next - 1)
   ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
   if len(ents) == 0 && !sendIfEmpty {
      return false
   }

   if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
      if !pr.RecentActive {
         r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
         return false
      }

      m.Type = pb.MsgSnap
      snapshot, err := r.raftLog.snapshot()
      if err != nil {
         if err == ErrSnapshotTemporarilyUnavailable {
            r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
            return false
         }
         panic(err) // TODO(bdarnell)
      }
      if IsEmptySnap(snapshot) {
         panic("need non-empty snapshot")
      }
      m.Snapshot = snapshot
      sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
      r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
         r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
      pr.becomeSnapshot(sindex)
      r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
   } else {
      m.Type = pb.MsgApp
      m.Index = pr.Next - 1
      m.LogTerm = term
      m.Entries = ents
      m.Commit = r.raftLog.committed
      if n := len(m.Entries); n != 0 {
         switch pr.State {
         // optimistically increase the next when in ProgressStateReplicate
         case ProgressStateReplicate:
            last := m.Entries[n-1].Index
            pr.optimisticUpdate(last)
            pr.ins.add(last)
         case ProgressStateProbe:
            pr.pause()
         default:
            r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
         }
      }
   }
   r.send(m)
   return true
}
  • 拿到对方的Progress,也就是进度。
  • 打包当前节点Next之后的entries
  • 打包当前节点Next-1的(任期,index),作为接收人校验用
  • 将自己committed的情况发给对方
  • 准备发MsgApp消息给对方
  • Snapshot的情况
    • 如果Next-1的任期或之后的entries如果查不到,那肯定就在snapshot里面
    • 拿出当前节点存储的snapshot,有可能在unstable或storage里面
    • 将对方的Progress设为ProgressStateSnapshot,且设置PendingSnapshot为snapshot的index
    • 准备发MsgSnap消息给对方

总结

后面的流程请参见EtcdRaft源码分析(日志复制)

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,651评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,468评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,931评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,218评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,234评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,198评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,084评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,926评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,341评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,563评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,731评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,430评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,036评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,676评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,829评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,743评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,629评论 2 354

推荐阅读更多精彩内容