分布式系统Consul一致性原理即Raft日志复制实现

背景

Consul 是一个非常强大的服务发现和配置管理工具,可以帮助您简化服务管理流程,提高系统的可用性和可扩展性,是目前非常流行的服务发现和配置管理系统,支持高可用,可扩展,多数据中心的分布式系统,是很多公司的基础实施组件,这些架构的优点的背后是基于分布式协议raft的实现,raft协议的理论有很多,以前需要根据paxos来实现,像zk字节实现了一套zap的协议来实现数据的复制和一致性,hdfs的namenode 日志高可用也是基于paxos实现,技术发展就是快,现在要实现一致性,高可靠,多副本基本上都是采用raft协议来实现,真正讲raft 日志复制实现的比较少,比如k8s用的etcd,nacos cp模型也有raft的实现,rocketmq也有raft实现slave选举,这篇文章主要分享consul raft 协议日志复制的实现原理,尝试讲明白写日志复制,日志顺序,过半提交,一致性检查等相关的知识点和实现原理。

Consul Raft

raft 算法主要包含两个部分,分别是leader选举和日志复制,leader选举我们不分析,我们主要分析日志复制的实现原理,下面我们以consul的key value 存储的写场景入手,一步步分析写请求的实现逻辑,是怎么实现raft 日志复制保证一致性的, 内容会比较长,会涉及到如下知识点:

  • 客户端发起请求
  • Server端接受请求
  • 拆包处理
  • 心跳机制
  • 批量发送
  • 过半提交
  • 一致性检查
  • 总结

Consul Agent 请求

客户端发起一个put key value的http请求,由kvs_endpoint.go 的KVSEndpoint func 处理,put的方法会路由给KVSPut 处理,除了一些校验外和请求标识,比如是否有获取锁acquire或者release,这里提下一个检查,就是value的大小检查,和web 容器一样检查防止请求数据太大,可以通过参数kv_max_value_size 控制,如果超过返回状态码413,标准的http 状态码。

检查都OK后,consul agent就开始请求consul server了,当然还是rpc 操作

// Copy the value
buf := bytes.NewBuffer(nil)
// 这里才开始读请求的数据。
if _, err := io.Copy(buf, req.Body); err != nil {
   return nil, err
}
applyReq.DirEnt.Value = buf.Bytes()

// Make the RPC
var out bool
// 开始请求server
if err := s.agent.RPC("KVS.Apply", &applyReq, &out); err != nil {
   return nil, err
}

// Only use the out value if this was a CAS
// 没有出错的话,这里就成功返回了
if applyReq.Op == api.KVSet {
        return true, nil
}

agent请求server时,会默认请求server list的第一个节点,只有在失败的请求下,回滚动节点,把失败的添加到最后,原来的第二个节点做为第一个节点,请求的是consul 下面的kvs_endpoint.go 下面的Apply 方法,所以我们的重点要来了

Server Apply

consul server的 apply方法,代码还是show下,这里还有两个逻辑说明下。

// Apply is used to apply a KVS update request to the data store.
func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
   // 检查机房dc是否匹配,不是就转发到对应到dc的server。
   if done, err := k.srv.forward("KVS.Apply", args, args, reply); done {
      return err
   }
   // 中间不重要的去了,省得太多...
   // 对权限token 应用ACL policy
   ok, err := kvsPreApply(k.logger, k.srv, authz, args.Op, &args.DirEnt)
   if err != nil {
      return err
   }
   if !ok {
      *reply = false
      return nil
   }

   // Apply the update.
   // 这里是开启raft 算法的之旅的入口。
   resp, err := k.srv.raftApply(structs.KVSRequestType, args)
   if err != nil {
      k.logger.Error("Raft apply failed", "error", err)
      return err
   }
   if respErr, ok := resp.(error); ok {
      return respErr
   }

   // Check if the return type is a bool.
   if respBool, ok := resp.(bool); ok {
      *reply = respBool
   }
   return nil
}

在真正开始执行raft 算法前,主要做了如下两件事:

先检查了dc是否是当前dc,如果不是会路由到正确的dc,这也是consul 支持多机房部署的一个很好的特性,路由很方便,这也是多机房部署consul是很好的选择。
检查是否启用了acl策略,如果有,需要检查,没有对应的token是不能操作的。

leader的写请求是从kvs的apply方法开始处理请求的,下面我们看下apply 方法的实现逻辑,在真正执行raft前,consul还做了一些加工,不能蛮搞,是非常严谨的,上面通过raftApply,经过几跳后,会执行到raftApplyWithEncoder方法,这里做的工作是很重要的,所以还是拿出来说下,是涨知识的地方,代码如下:

// raftApplyWithEncoder is used to encode a message, run it through raft,
// and return the FSM response along with any errors. Unlike raftApply this
// takes the encoder to use as an argument.
func (s *Server) raftApplyWithEncoder(t structs.MessageType, msg interface{}, encoder raftEncoder) (interface{}, error) {
   if encoder == nil {
      return nil, fmt.Errorf("Failed to encode request: nil encoder")
   }
   // 对请求编码。
   buf, err := encoder(t, msg)
   if err != nil {
      return nil, fmt.Errorf("Failed to encode request: %v", err)
   }

   // Warn if the command is very large
   if n := len(buf); n > raftWarnSize {
      s.rpcLogger().Warn("Attempting to apply large raft entry", "size_in_bytes", n)
   }

   var chunked bool
   var future raft.ApplyFuture
   switch {
   case len(buf) <= raft.SuggestedMaxDataSize || t != structs.KVSRequestType:
      //请求的数据大小如果小于512 * 1024 即512k,则做一次log执行。
      future = s.raft.Apply(buf, enqueueLimit)
   default:
      //超过了512k,则需要分chunk,每个chunk做为一个log来应用。
      chunked = true
      //这里就是每个log一次future。
      future = raftchunking.ChunkingApply(buf, nil, enqueueLimit, s.raft.ApplyLog)
   }

   //阻塞,等待raft协议完成。
   if err := future.Error(); err != nil {
      return nil, err
   }

   resp := future.Response()

   //...
   return resp, nil
}

这里通过注释,你也可以看出,主要关心4件事情:

  1. 把请求编码,这个不是我们的重点,后面有时间可以单独分析。
  2. 检查是否要拆包,是否要拆成多个raft command 来执行,这里有个参数控制,SuggestedMaxDataSize consul 默认设置是512k,如果超过这个则拆,否则可以一次raft 协议搞定。
  3. 有一个超时时间,默认是30秒,后面会用到。
  4. 最后事阻塞等待完成,是logfuture。

为什么要拆包

这些事raft 算法不会提的,这个事工程实践才会有的一些优化,此时你也和我一样,为啥要做这个优化呢,有什么好处,解决什么问题,这是我们做一个架构师必须要有的思考。

consul的官方就给出了解释,所以阅读优秀的代码就是一种享受,看注释就能知道为啥这样做,下面是他们对SuggestedMaxDataSize的注释:

// Increasing beyond this risks RPC IO taking too long and preventing
// timely heartbeat signals which are sent in serial in current transports,
// potentially causing leadership instability.
SuggestedMaxDataSize = 512 * 1024

理解就是单次log 数据不能太大,太大理解有下面几个问题:

  • 链接池:leader和follower默认最大3个链接,但新版本的代码实现是没有链接用时,因为日志复制发送会独占一个链接,直到follower应用完成返回成功,才会释放链接,如果发送的数据量大,回影响链接释放的时间,但是目前大版本通过fast path处理以及会新建一个链接来发送,就没有了这个问题。
  • 网络带宽:而且并发高的时候,都是批量发送的,还会同时发给多个follower,有可能导致leader的网络带宽过大,会影响心跳包出现延迟,另外如果heartbeat包也支持follower commit日志,follower端也会影响heartbeat包的处理。

上面说影响到心跳,那我们总要知道heartbeat是怎么实现的,看看到底有什么影响,所以我们把consul的心跳机制实现原理说明下

定时心跳

  • raft 心跳理论
    把日志提交的两阶段优化为了一个阶段,省去了commit阶段,减少了一个rt,提升了吞吐量,为什么能这样优化,是借助下次请求和心跳请求来告诉followe 当前leader的commit index,所以raft 算法认为心跳包也是会带上当前commit index 给follower,让follower可以尽快提交,保持和leader一致,理论是这样的,但是consul并没有这样实现,请继续看下面

  • Consul 心跳实现
    但是consul 在实践的时候并没有这么做,也可能是优化了实现,实现逻辑是每个follower有一个独立的goroutine来负责发送heartbeat,consul leader 给follower发心跳时,只带了一个当前leader的任期和leader自身的id和地址两个信息,不带log 相关的信息,所以follower 处理心跳请求就很简单,只要更新下心跳时间即可,当然也会检查任期,这样就没有了io请求,就能快速响应,也叫fast path,就时直接在io线程处理了,因为follower 处理正常的rpc请求和心跳请求经过decode后,都会统一有follower的main goroutine来处理,如果有一个log append的rpc请求很大,即io操作会大,需要持久化log,很定影响后面的请求,即会影响到心跳请求,follower认为在心跳超时时间内没有收到心跳,则认为leader出了问题,会触发选举,就影响了leader稳定。

  • CommitTimeout
    consul 没有通过心跳机制来让follower尽快和leader保持一致来commit log 到fsm状态机,如果写不连续,那最近一次写follower就会一直不提交,本来是发心跳给follower时会让follower提交,但现在心跳不干这个活了,所以consul 需要一个新的机制来保证即使没有新的写请求的情况下,让follower也尽快和leader保持一致的commit log,这个机制就是commit timeout随机时间,每到了一个时间,consul leader就发一个
    日志复制的请求给follower,该rpc请求除了带上leader任期term和标识信息外,还会告诉follower leader的commitindex,follower 就能比较自己的commitindex,如果小于,则进行提交的流程,把没有应用到状态机的log commit掉。

批量发送

说完了拆包优化逻辑后,我们看下ApplyLog的逻辑,代码如下:

// ApplyLog performs Apply but takes in a Log directly. The only values
// currently taken from the submitted Log are Data and Extensions.
func (r *Raft) ApplyLog(log Log, timeout time.Duration) ApplyFuture {
   metrics.IncrCounter([]string{"raft", "apply"}, 1)

   var timer <-chan time.Time
   if timeout > 0 {
      timer = time.After(timeout)
   }

   // Create a log future, no index or term yet
   logFuture := &logFuture{
      log: Log{
         Type:       LogCommand,
         Data:       log.Data,
         Extensions: log.Extensions,
      },
   }
   logFuture.init()

   select {
   case <-timer:
      return errorFuture{ErrEnqueueTimeout}
   case <-r.shutdownCh:
      return errorFuture{ErrRaftShutdown}
   case r.applyCh <- logFuture:
      return logFuture
   }
}

这里主要关心这个applyCh channel,consul 在初始化leader的时候给创建的一个无缓冲区的通道,所以如果leader的协程在干其他的事情,那这个提交log就阻塞了,时间最长30s,写入成功,就返回了logFuture,也就事前面我们看到future的阻塞。

到这里整个consul leader server的插入请求从接受到阻塞等待的逻辑就完成了,consul leader server 有个核心的go routine 在watch 这个applyCh,从定义可以看出,是应用raft log的channel,阻塞在applych 的go routine 代码如下:

case newLog := <-r.applyCh://这个是前面我们提交log future的
   if r.getLeadershipTransferInProgress() {
      r.logger.Debug(ErrLeadershipTransferInProgress.Error())
      newLog.respond(ErrLeadershipTransferInProgress)
      continue
   }
   // Group commit, gather all the ready commits
   ready := []*logFuture{newLog}
GROUP_COMMIT_LOOP:
   for i := 0; i < r.conf.MaxAppendEntries; i++ {
      select {
      case newLog := <-r.applyCh:
         ready = append(ready, newLog)
      default:
         break GROUP_COMMIT_LOOP
      }
   }

   // Dispatch the logs
   if stepDown {
      // we're in the process of stepping down as leader, don't process anything new
     //如果发现我们不是leader了,直接响应失败 
     for i := range ready {
         ready[i].respond(ErrNotLeader)
      }
   } else {
      r.dispatchLogs(ready)
   }

这里的一个重要的点就是组发送请求,就是读applyCh的log,这个里做了组提交的优化,最多一次发送MaxAppendEntries个,默认位64个,如果并发高的情况下,这里是能读到一个batch的,在网络传输和io操作,分组提交是一个通用的优化技巧,比如dubbo在rpc网络发送,rocketmq,mysql innodb log提交都用了分组提交技术来充分利用网络io带宽,减少网络来回或者io次数的开销,因为一次大概率是用不完网络或者io带宽的,就像高速4车道的,我们可以一次发四个,而不是一个一个的发。

分组好了后,下面就开始dispatch log了,代码如下:

// dispatchLog is called on the leader to push a log to disk, mark it
// as inflight and begin replication of it.
func (r *Raft) dispatchLogs(applyLogs []*logFuture) {
   now := time.Now()
   defer metrics.MeasureSince([]string{"raft", "leader", "dispatchLog"}, now)

   //获取当前leader的任期编号,这个不会重复是递增的,如果有心的leaer了,会比这个大。
   term := r.getCurrentTerm()
   //log 编号,写一个加1
   lastIndex := r.getLastIndex()

   n := len(applyLogs)
   logs := make([]*Log, n)
   metrics.SetGauge([]string{"raft", "leader", "dispatchNumLogs"}, float32(n))

   //设置每个log的编号和任期
   for idx, applyLog := range applyLogs {
      applyLog.dispatch = now
      lastIndex++
      applyLog.log.Index = lastIndex
      applyLog.log.Term = term
      logs[idx] = &applyLog.log
      r.leaderState.inflight.PushBack(applyLog)
   }

   // Write the log entry locally
   // log先写入本地持久化,consul大部分的版本底层用的是boltdb,boltdb
   // 是一个支持事物的数据库,非常方便,这里会涉及io操作。
   if err := r.logs.StoreLogs(logs); err != nil {
      r.logger.Error("failed to commit logs", "error", err)
      //如果写失败,则直接响应,前面的future阻塞就会唤醒。
      for _, applyLog := range applyLogs {
         applyLog.respond(err)
      }
      //更新自己为follower
      r.setState(Follower)
      return
   }
   //这里很重要,好就才看明白,这个是log 复制成功后,最终应用到状态机的一个机制
     //这里是记录下leader自己的结果,因为过半leader也算一份。
   r.leaderState.commitment.match(r.localID, lastIndex)

   // Update the last log since it's on disk now
   // 更新最新log entry的编号,写到这里了。
   r.setLastLog(lastIndex, term)

   // Notify the replicators of the new log
   // 开始异步发送给所有的follower,这个leader主go routine的活就干完了。
   for _, f := range r.leaderState.replState {
      asyncNotifyCh(f.triggerCh)
   }
}

这个dispatchlog的逻辑注释里基本写清楚了,核心的go routine 经过一顿操作后,最主要就是两点:

  • 本地持久化log
    consul log持久化是通过boltdb来存储的,boltdb可以看做一个简单版的innodb实现,是一个支撑事务和mvcc的存储引擎, consul 新版本自己实现了log持久化通过wal的方式,可以配置。
  • 为过半成功增加一次记录,记录自己写成功,因为计算过半时,leader自己这一份也算在里面,这个很重要。

又异步交给了replicate go routine来处理,他就去继续去分组提交了,大概率如此循环往复,不知疲倦的给replication routine 派活。

复制GoRoutine

replication routine 会监听triggerCh channel,接受领导的任务,这个比较简单,就开始真正发给各自的follower了,代码如下:

case <-s.triggerCh:
   lastLogIdx, _ := r.getLastLog()
   //这个后面没有异步了,就是这个rpc调用,判断
   shouldStop = r.replicateTo(s, lastLogIdx)

replicateTo 就是rpc调用follower,真正远程rpc给follower,等待响应,这里consul 为保障follower完全和leader的日志一致,需要做有序检查,所以consul leader 在replicate log给follower时,有一个细节要注意下,就是leader 除发当前操作的log entry,还需要带上上一条log entry,每条log entry的有两个关键变量:

  • log.Term 是log所属的leader的任期。
  • log.Index 是log的编号。

这里要发送的日志是就是获取当前最大的log index即lastIndex 做为最大值,然后每个follower维护一个nextIndex,即从那里开始读log,replication goroutine 会从存储里获取nextIndex-->lastIndex 的log,这里可能涉及到io操作,就是把前面的持久化的log,再批量读出来,nextIndex是在复制给follower成功后,会吧lastIndex+1 来更新nextIndex,下次就从新的地方开始读了。

除了log外,还会带上leader当前的CommitIndex,即leader已经应用到状态机FSM的log 索引,follower通过这个来比较,判断自己是否要提交log。

follower 节点通过这两个变量来匹配log是否一致,下面log一致性检查会说明具体怎么用,也会说明为啥要发前面一天log。

过半提交

raft协议要求写操作,只有超过一半才能算成功,才能应用到状态机FSM, 客户端才能读到这个数据,这个过半是leader自己也算在里面的,也就是前面一篇文章我们提到的,leader在持久化log后,就标记自己写成功了,我们没有分析,现在我们来分析下这个逻辑,因为follower 处理完日志复制后,也是有这个逻辑处理的。

//这里很重要,好就才看明白,这个是log 复制成功后,最终应用到状态机的一个机制
//这里是记录下leader自己的结果,因为过半leader也算一份。
r.leaderState.commitment.match(r.localID, lastIndex)

我们上篇文章只是在这里做了一个注释,并没有分析里面怎么实现的,我们就是要搞懂到底怎么实现的,下面是match的代码:

// Match is called once a server completes writing entries to disk: either the
// leader has written the new entry or a follower has replied to an
// AppendEntries RPC. The given server's disk agrees with this server's log up
// through the given index.
func (c *commitment) match(server ServerID, matchIndex uint64) {
   c.Lock()
   defer c.Unlock()
   if prev, hasVote := c.matchIndexes[server]; hasVote && matchIndex > prev {
      c.matchIndexes[server] = matchIndex
      c.recalculate()
   }
}

注释也基本说明了这个方法的作用,就是我们上面说的,我们就不再重复了,要理解这个逻辑,先了解下这个数据结构matchIndexes,matchIndexes 是一个map,key就是server id,就是consul 集群每个节点有一个id,value就是上次应用log到状态机的编号commitIndex,recalculate的代码如下:

// Internal helper to calculate new commitIndex from matchIndexes.
// Must be called with lock held.
func (c *commitment) recalculate() {
   if len(c.matchIndexes) == 0 {
      return
   }
   matched := make([]uint64, 0, len(c.matchIndexes))
   for _, idx := range c.matchIndexes {
      matched = append(matched, idx)
   }
   //这个排序是降序,才能保证下面取中间索引位置的值来判断是否过半已经复制成功。
   sort.Sort(uint64Slice(matched))
   quorumMatchIndex := matched[(len(matched)-1)/2]
   //如果超过一半的follower成功了,则开始commit,即应用到状态机
   if quorumMatchIndex > c.commitIndex && quorumMatchIndex >= c.startIndex {
      c.commitIndex = quorumMatchIndex
      //符合条件,触发commit,通知leader执行apply log
      asyncNotifyCh(c.commitCh)
   }
}

这个recalculate的逻辑单独看有点晦涩,先不急于理解,先举一个例子来说明下recalculate的逻辑:

假如集群三个节点,server id分别为1,2,3,上次写log的编号是3,就是leader和follower都成功了,这个matchIndexes的数据如下:

1(leader) --> 3

2(follower) --> 3

3(follower) --> 3

假如这个时候新来一个put请求,leader本地持久化成功,就要更新这个数据结构了matchIndexes了, 因为leader是先更新,再并发请求follower的,所以这个时候matchIndexes数据如下,因为一个log,所以logIndex是加1。

1(leader) --> 4

2(follower) --> 3

3(follower) --> 3

因为leader本地完成和follower远程完成一样,都要通过这个逻辑来判断是否commit 该log 请求,即是否应用到FSM,所以就是要判断是否过半完成了,逻辑是这样的:

  1. 先创建一个数组matched,长度为集群节点数,我们的例子是3,

  2. 然后把matchIndexes的commitIndex 起出来,放到matched中,matched的数据就是[4,3,3]

  3. 排序,为啥要排序,因为map 是无序的,下面要通过中间索引的值来判断是否变化。

  4. 然后计算 quorumMatchIndex := matched[(len(matched)-1)/2],这个就是取中间索引下标的值,也是因为这点,需要第三步排序.

  5. 比较quorumMatchIndex 是否大于当前的commitIndex,如果大于,说明满足过半的条件,则更新,然后应用到状态机。

通过上面5步,来实现了一个过半的逻辑,我们再以两个场景来理下,

假如一个follower失败了,一个成功,成功的follower会更新matched的数据是[4,3,4],或者是[4,4,3],排序后为都是[4,4,3], 第4步计算的结果是4大于3,就可以提交了,经过上面的详细,再回看上面的代码就好理解了。

一致性检查:

raft协议日志复制是需要严格保证顺序的,所以在日志复制的时候follower需要对日志做检查,主要有两种情况:

  • log有gap,比如follower停了一段时间,重新加入集群,这个时候follower的log 编号很多事和leader有差距的,对这种情况,就是日志一致性的保证。

  • follower之前有其他的leader写了日志,需要覆盖,以新的leader为准。

  • leader网络出现问题,集群已经有新的leader,老的leader又活过来后,重新发起日志到follower,这个时候任期比会新的leader小,follower直接返回不处理该请求,老的leader 在检查响应时,先判断follower的term是不是比自己大,就停止复制的工作。

每次log append给follower时,follower会把自己当前的logindex 编号和当前leader的任期term返回给leader,leader获取到对应的编号时,会更新发送logNext,也就是从这里开始发生日志给follower,就进入重试的的流程,重新发日志。

这里consul 根据raft协议做了一个优化,raft协议描述的是每次递减一个logindex 编号,来回确认,直到找到follower匹配的编号,再开始发日志,这样性能就很差,所有基本上没有那个分布式系统是那样实现落地的。

Commit Log

只要超过一半的日志 复制成功,consul 就进入日志commit阶段,也就是将修改应用到状态机,通过recalculate 方法给leader监听的commitCh 发一个消息,通知leader开始执行apply log 到FSM, leader 的代码如下:

case <-r.leaderState.commitCh:
   // Process the newly committed entries
   //上次执行commit log index
   oldCommitIndex := r.getCommitIndex()
   //新的log需要commit的log index,在判断是过半时,会更新commitindex
   commitIndex := r.leaderState.commitment.getCommitIndex()
   r.setCommitIndex(commitIndex)

     ....
   start := time.Now()
   var groupReady []*list.Element
   var groupFutures = make(map[uint64]*logFuture)
   var lastIdxInGroup uint64

   // Pull all inflight logs that are committed off the queue.
   for e := r.leaderState.inflight.Front(); e != nil; e = e.Next() {
      commitLog := e.Value.(*logFuture)
      idx := commitLog.log.Index
      //idx 大于commitIndex,说明是后面新写入的,还没有同步到follower的日志。
      if idx > commitIndex {
         // Don't go past the committed index
         break
      }

      // Measure the commit time
      metrics.MeasureSince([]string{"raft", "commitTime"}, commitLog.dispatch)
      groupReady = append(groupReady, e)
      groupFutures[idx] = commitLog
      lastIdxInGroup = idx
   }

   // Process the group
   if len(groupReady) != 0 {
      //应用的逻辑在这里。groupFutures 就是写入go routine wait的future
      r.processLogs(lastIdxInGroup, groupFutures)
      //清理inflight集合中已经commit过的log,防止重复commit
      for _, e := range groupReady {
         r.leaderState.inflight.Remove(e)
      }
   }

这里比较简单,就是从leaderState.inflight 中取出log,就是我们之前写入的,循环判断,如果log的编号大于commitIndex,说明是后面新写入的log,还没有同步到follower的log,不能提交。这里应该是有序的,lastIdxInGroup 应该就是需要commit的log的最大的一个编号。

processLogs的逻辑就是支持分批提交支持,发给consul 的runFSM的go routine,consul raft专门有一个go routine来负责commit log到状态机,支持批量和一个一个commit,我们看下单个commit的情况,代码如下:

commitSingle := func(req *commitTuple) {
   // Apply the log if a command or config change
   var resp interface{}
   // Make sure we send a response
   defer func() {
      // Invoke the future if given
      if req.future != nil {
         req.future.response = resp
         req.future.respond(nil)
      }
   }()

   switch req.log.Type {
   case LogCommand:
      start := time.Now()
      //将日志应用到FSM的关键在这里。 
      resp = r.fsm.Apply(req.log)
      metrics.MeasureSince([]string{"raft", "fsm", "apply"}, start)
    ....
   
   }

   // Update the indexes
   lastIndex = req.log.Index
   lastTerm = req.log.Term
}

主要就是三点,应用log 到fsm,然后跟新下fsm的logindex和任期,最后就是要通知还在wait的go routine。

整个日志复制的流程很长,最后再上一张图总结下整个过程:

consul-raft-log-replication.png

总结

这篇文章主要基于consul 目前版本的实现和基于个人的理解,以consul 一个写请求的整个过程为线索,介绍了consul 基于raft协议的实现日志复制的基本过程,重点介绍了日志顺序保证措施,日志一致性检查,过半提交log,心跳机制的实现原理,以及相关的几个优化措施,比如大请求分拆,分组批量发送等一些实践优化措施,如有不正确的地方,欢迎交流和指正,目前正在看机会中。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,417评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,921评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,850评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,945评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,069评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,188评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,239评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,994评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,409评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,735评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,898评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,578评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,205评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,916评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,156评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,722评论 2 363
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,781评论 2 351

推荐阅读更多精彩内容