背景
Consul 是一个非常强大的服务发现和配置管理工具,可以帮助您简化服务管理流程,提高系统的可用性和可扩展性,是目前非常流行的服务发现和配置管理系统,支持高可用,可扩展,多数据中心的分布式系统,是很多公司的基础实施组件,这些架构的优点的背后是基于分布式协议raft的实现,raft协议的理论有很多,以前需要根据paxos来实现,像zk字节实现了一套zap的协议来实现数据的复制和一致性,hdfs的namenode 日志高可用也是基于paxos实现,技术发展就是快,现在要实现一致性,高可靠,多副本基本上都是采用raft协议来实现,真正讲raft 日志复制实现的比较少,比如k8s用的etcd,nacos cp模型也有raft的实现,rocketmq也有raft实现slave选举,这篇文章主要分享consul raft 协议日志复制的实现原理,尝试讲明白写日志复制,日志顺序,过半提交,一致性检查等相关的知识点和实现原理。
Consul Raft
raft 算法主要包含两个部分,分别是leader选举和日志复制,leader选举我们不分析,我们主要分析日志复制的实现原理,下面我们以consul的key value 存储的写场景入手,一步步分析写请求的实现逻辑,是怎么实现raft 日志复制保证一致性的, 内容会比较长,会涉及到如下知识点:
- 客户端发起请求
- Server端接受请求
- 拆包处理
- 心跳机制
- 批量发送
- 过半提交
- 一致性检查
- 总结
Consul Agent 请求
客户端发起一个put key value的http请求,由kvs_endpoint.go 的KVSEndpoint func 处理,put的方法会路由给KVSPut 处理,除了一些校验外和请求标识,比如是否有获取锁acquire或者release,这里提下一个检查,就是value的大小检查,和web 容器一样检查防止请求数据太大,可以通过参数kv_max_value_size 控制,如果超过返回状态码413,标准的http 状态码。
检查都OK后,consul agent就开始请求consul server了,当然还是rpc 操作
// Copy the value
buf := bytes.NewBuffer(nil)
// 这里才开始读请求的数据。
if _, err := io.Copy(buf, req.Body); err != nil {
return nil, err
}
applyReq.DirEnt.Value = buf.Bytes()
// Make the RPC
var out bool
// 开始请求server
if err := s.agent.RPC("KVS.Apply", &applyReq, &out); err != nil {
return nil, err
}
// Only use the out value if this was a CAS
// 没有出错的话,这里就成功返回了
if applyReq.Op == api.KVSet {
return true, nil
}
agent请求server时,会默认请求server list的第一个节点,只有在失败的请求下,回滚动节点,把失败的添加到最后,原来的第二个节点做为第一个节点,请求的是consul 下面的kvs_endpoint.go 下面的Apply 方法,所以我们的重点要来了
Server Apply
consul server的 apply方法,代码还是show下,这里还有两个逻辑说明下。
// Apply is used to apply a KVS update request to the data store.
func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
// 检查机房dc是否匹配,不是就转发到对应到dc的server。
if done, err := k.srv.forward("KVS.Apply", args, args, reply); done {
return err
}
// 中间不重要的去了,省得太多...
// 对权限token 应用ACL policy
ok, err := kvsPreApply(k.logger, k.srv, authz, args.Op, &args.DirEnt)
if err != nil {
return err
}
if !ok {
*reply = false
return nil
}
// Apply the update.
// 这里是开启raft 算法的之旅的入口。
resp, err := k.srv.raftApply(structs.KVSRequestType, args)
if err != nil {
k.logger.Error("Raft apply failed", "error", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
// Check if the return type is a bool.
if respBool, ok := resp.(bool); ok {
*reply = respBool
}
return nil
}
在真正开始执行raft 算法前,主要做了如下两件事:
先检查了dc是否是当前dc,如果不是会路由到正确的dc,这也是consul 支持多机房部署的一个很好的特性,路由很方便,这也是多机房部署consul是很好的选择。
检查是否启用了acl策略,如果有,需要检查,没有对应的token是不能操作的。
leader的写请求是从kvs的apply方法开始处理请求的,下面我们看下apply 方法的实现逻辑,在真正执行raft前,consul还做了一些加工,不能蛮搞,是非常严谨的,上面通过raftApply,经过几跳后,会执行到raftApplyWithEncoder方法,这里做的工作是很重要的,所以还是拿出来说下,是涨知识的地方,代码如下:
// raftApplyWithEncoder is used to encode a message, run it through raft,
// and return the FSM response along with any errors. Unlike raftApply this
// takes the encoder to use as an argument.
func (s *Server) raftApplyWithEncoder(t structs.MessageType, msg interface{}, encoder raftEncoder) (interface{}, error) {
if encoder == nil {
return nil, fmt.Errorf("Failed to encode request: nil encoder")
}
// 对请求编码。
buf, err := encoder(t, msg)
if err != nil {
return nil, fmt.Errorf("Failed to encode request: %v", err)
}
// Warn if the command is very large
if n := len(buf); n > raftWarnSize {
s.rpcLogger().Warn("Attempting to apply large raft entry", "size_in_bytes", n)
}
var chunked bool
var future raft.ApplyFuture
switch {
case len(buf) <= raft.SuggestedMaxDataSize || t != structs.KVSRequestType:
//请求的数据大小如果小于512 * 1024 即512k,则做一次log执行。
future = s.raft.Apply(buf, enqueueLimit)
default:
//超过了512k,则需要分chunk,每个chunk做为一个log来应用。
chunked = true
//这里就是每个log一次future。
future = raftchunking.ChunkingApply(buf, nil, enqueueLimit, s.raft.ApplyLog)
}
//阻塞,等待raft协议完成。
if err := future.Error(); err != nil {
return nil, err
}
resp := future.Response()
//...
return resp, nil
}
这里通过注释,你也可以看出,主要关心4件事情:
- 把请求编码,这个不是我们的重点,后面有时间可以单独分析。
- 检查是否要拆包,是否要拆成多个raft command 来执行,这里有个参数控制,SuggestedMaxDataSize consul 默认设置是512k,如果超过这个则拆,否则可以一次raft 协议搞定。
- 有一个超时时间,默认是30秒,后面会用到。
- 最后事阻塞等待完成,是logfuture。
为什么要拆包
这些事raft 算法不会提的,这个事工程实践才会有的一些优化,此时你也和我一样,为啥要做这个优化呢,有什么好处,解决什么问题,这是我们做一个架构师必须要有的思考。
consul的官方就给出了解释,所以阅读优秀的代码就是一种享受,看注释就能知道为啥这样做,下面是他们对SuggestedMaxDataSize的注释:
// Increasing beyond this risks RPC IO taking too long and preventing
// timely heartbeat signals which are sent in serial in current transports,
// potentially causing leadership instability.
SuggestedMaxDataSize = 512 * 1024
理解就是单次log 数据不能太大,太大理解有下面几个问题:
- 链接池:leader和follower默认最大3个链接,但新版本的代码实现是没有链接用时,因为日志复制发送会独占一个链接,直到follower应用完成返回成功,才会释放链接,如果发送的数据量大,回影响链接释放的时间,但是目前大版本通过fast path处理以及会新建一个链接来发送,就没有了这个问题。
- 网络带宽:而且并发高的时候,都是批量发送的,还会同时发给多个follower,有可能导致leader的网络带宽过大,会影响心跳包出现延迟,另外如果heartbeat包也支持follower commit日志,follower端也会影响heartbeat包的处理。
上面说影响到心跳,那我们总要知道heartbeat是怎么实现的,看看到底有什么影响,所以我们把consul的心跳机制实现原理说明下
定时心跳
raft 心跳理论
把日志提交的两阶段优化为了一个阶段,省去了commit阶段,减少了一个rt,提升了吞吐量,为什么能这样优化,是借助下次请求和心跳请求来告诉followe 当前leader的commit index,所以raft 算法认为心跳包也是会带上当前commit index 给follower,让follower可以尽快提交,保持和leader一致,理论是这样的,但是consul并没有这样实现,请继续看下面Consul 心跳实现
但是consul 在实践的时候并没有这么做,也可能是优化了实现,实现逻辑是每个follower有一个独立的goroutine来负责发送heartbeat,consul leader 给follower发心跳时,只带了一个当前leader的任期和leader自身的id和地址两个信息,不带log 相关的信息,所以follower 处理心跳请求就很简单,只要更新下心跳时间即可,当然也会检查任期,这样就没有了io请求,就能快速响应,也叫fast path,就时直接在io线程处理了,因为follower 处理正常的rpc请求和心跳请求经过decode后,都会统一有follower的main goroutine来处理,如果有一个log append的rpc请求很大,即io操作会大,需要持久化log,很定影响后面的请求,即会影响到心跳请求,follower认为在心跳超时时间内没有收到心跳,则认为leader出了问题,会触发选举,就影响了leader稳定。CommitTimeout
consul 没有通过心跳机制来让follower尽快和leader保持一致来commit log 到fsm状态机,如果写不连续,那最近一次写follower就会一直不提交,本来是发心跳给follower时会让follower提交,但现在心跳不干这个活了,所以consul 需要一个新的机制来保证即使没有新的写请求的情况下,让follower也尽快和leader保持一致的commit log,这个机制就是commit timeout随机时间,每到了一个时间,consul leader就发一个
日志复制的请求给follower,该rpc请求除了带上leader任期term和标识信息外,还会告诉follower leader的commitindex,follower 就能比较自己的commitindex,如果小于,则进行提交的流程,把没有应用到状态机的log commit掉。
批量发送
说完了拆包优化逻辑后,我们看下ApplyLog的逻辑,代码如下:
// ApplyLog performs Apply but takes in a Log directly. The only values
// currently taken from the submitted Log are Data and Extensions.
func (r *Raft) ApplyLog(log Log, timeout time.Duration) ApplyFuture {
metrics.IncrCounter([]string{"raft", "apply"}, 1)
var timer <-chan time.Time
if timeout > 0 {
timer = time.After(timeout)
}
// Create a log future, no index or term yet
logFuture := &logFuture{
log: Log{
Type: LogCommand,
Data: log.Data,
Extensions: log.Extensions,
},
}
logFuture.init()
select {
case <-timer:
return errorFuture{ErrEnqueueTimeout}
case <-r.shutdownCh:
return errorFuture{ErrRaftShutdown}
case r.applyCh <- logFuture:
return logFuture
}
}
这里主要关心这个applyCh channel,consul 在初始化leader的时候给创建的一个无缓冲区的通道,所以如果leader的协程在干其他的事情,那这个提交log就阻塞了,时间最长30s,写入成功,就返回了logFuture,也就事前面我们看到future的阻塞。
到这里整个consul leader server的插入请求从接受到阻塞等待的逻辑就完成了,consul leader server 有个核心的go routine 在watch 这个applyCh,从定义可以看出,是应用raft log的channel,阻塞在applych 的go routine 代码如下:
case newLog := <-r.applyCh://这个是前面我们提交log future的
if r.getLeadershipTransferInProgress() {
r.logger.Debug(ErrLeadershipTransferInProgress.Error())
newLog.respond(ErrLeadershipTransferInProgress)
continue
}
// Group commit, gather all the ready commits
ready := []*logFuture{newLog}
GROUP_COMMIT_LOOP:
for i := 0; i < r.conf.MaxAppendEntries; i++ {
select {
case newLog := <-r.applyCh:
ready = append(ready, newLog)
default:
break GROUP_COMMIT_LOOP
}
}
// Dispatch the logs
if stepDown {
// we're in the process of stepping down as leader, don't process anything new
//如果发现我们不是leader了,直接响应失败
for i := range ready {
ready[i].respond(ErrNotLeader)
}
} else {
r.dispatchLogs(ready)
}
这里的一个重要的点就是组发送请求,就是读applyCh的log,这个里做了组提交的优化,最多一次发送MaxAppendEntries个,默认位64个,如果并发高的情况下,这里是能读到一个batch的,在网络传输和io操作,分组提交是一个通用的优化技巧,比如dubbo在rpc网络发送,rocketmq,mysql innodb log提交都用了分组提交技术来充分利用网络io带宽,减少网络来回或者io次数的开销,因为一次大概率是用不完网络或者io带宽的,就像高速4车道的,我们可以一次发四个,而不是一个一个的发。
分组好了后,下面就开始dispatch log了,代码如下:
// dispatchLog is called on the leader to push a log to disk, mark it
// as inflight and begin replication of it.
func (r *Raft) dispatchLogs(applyLogs []*logFuture) {
now := time.Now()
defer metrics.MeasureSince([]string{"raft", "leader", "dispatchLog"}, now)
//获取当前leader的任期编号,这个不会重复是递增的,如果有心的leaer了,会比这个大。
term := r.getCurrentTerm()
//log 编号,写一个加1
lastIndex := r.getLastIndex()
n := len(applyLogs)
logs := make([]*Log, n)
metrics.SetGauge([]string{"raft", "leader", "dispatchNumLogs"}, float32(n))
//设置每个log的编号和任期
for idx, applyLog := range applyLogs {
applyLog.dispatch = now
lastIndex++
applyLog.log.Index = lastIndex
applyLog.log.Term = term
logs[idx] = &applyLog.log
r.leaderState.inflight.PushBack(applyLog)
}
// Write the log entry locally
// log先写入本地持久化,consul大部分的版本底层用的是boltdb,boltdb
// 是一个支持事物的数据库,非常方便,这里会涉及io操作。
if err := r.logs.StoreLogs(logs); err != nil {
r.logger.Error("failed to commit logs", "error", err)
//如果写失败,则直接响应,前面的future阻塞就会唤醒。
for _, applyLog := range applyLogs {
applyLog.respond(err)
}
//更新自己为follower
r.setState(Follower)
return
}
//这里很重要,好就才看明白,这个是log 复制成功后,最终应用到状态机的一个机制
//这里是记录下leader自己的结果,因为过半leader也算一份。
r.leaderState.commitment.match(r.localID, lastIndex)
// Update the last log since it's on disk now
// 更新最新log entry的编号,写到这里了。
r.setLastLog(lastIndex, term)
// Notify the replicators of the new log
// 开始异步发送给所有的follower,这个leader主go routine的活就干完了。
for _, f := range r.leaderState.replState {
asyncNotifyCh(f.triggerCh)
}
}
这个dispatchlog的逻辑注释里基本写清楚了,核心的go routine 经过一顿操作后,最主要就是两点:
- 本地持久化log
consul log持久化是通过boltdb来存储的,boltdb可以看做一个简单版的innodb实现,是一个支撑事务和mvcc的存储引擎, consul 新版本自己实现了log持久化通过wal的方式,可以配置。 - 为过半成功增加一次记录,记录自己写成功,因为计算过半时,leader自己这一份也算在里面,这个很重要。
又异步交给了replicate go routine来处理,他就去继续去分组提交了,大概率如此循环往复,不知疲倦的给replication routine 派活。
复制GoRoutine
replication routine 会监听triggerCh channel,接受领导的任务,这个比较简单,就开始真正发给各自的follower了,代码如下:
case <-s.triggerCh:
lastLogIdx, _ := r.getLastLog()
//这个后面没有异步了,就是这个rpc调用,判断
shouldStop = r.replicateTo(s, lastLogIdx)
replicateTo 就是rpc调用follower,真正远程rpc给follower,等待响应,这里consul 为保障follower完全和leader的日志一致,需要做有序检查,所以consul leader 在replicate log给follower时,有一个细节要注意下,就是leader 除发当前操作的log entry,还需要带上上一条log entry,每条log entry的有两个关键变量:
- log.Term 是log所属的leader的任期。
- log.Index 是log的编号。
这里要发送的日志是就是获取当前最大的log index即lastIndex 做为最大值,然后每个follower维护一个nextIndex,即从那里开始读log,replication goroutine 会从存储里获取nextIndex-->lastIndex 的log,这里可能涉及到io操作,就是把前面的持久化的log,再批量读出来,nextIndex是在复制给follower成功后,会吧lastIndex+1 来更新nextIndex,下次就从新的地方开始读了。
除了log外,还会带上leader当前的CommitIndex,即leader已经应用到状态机FSM的log 索引,follower通过这个来比较,判断自己是否要提交log。
follower 节点通过这两个变量来匹配log是否一致,下面log一致性检查会说明具体怎么用,也会说明为啥要发前面一天log。
过半提交
raft协议要求写操作,只有超过一半才能算成功,才能应用到状态机FSM, 客户端才能读到这个数据,这个过半是leader自己也算在里面的,也就是前面一篇文章我们提到的,leader在持久化log后,就标记自己写成功了,我们没有分析,现在我们来分析下这个逻辑,因为follower 处理完日志复制后,也是有这个逻辑处理的。
//这里很重要,好就才看明白,这个是log 复制成功后,最终应用到状态机的一个机制
//这里是记录下leader自己的结果,因为过半leader也算一份。
r.leaderState.commitment.match(r.localID, lastIndex)
我们上篇文章只是在这里做了一个注释,并没有分析里面怎么实现的,我们就是要搞懂到底怎么实现的,下面是match的代码:
// Match is called once a server completes writing entries to disk: either the
// leader has written the new entry or a follower has replied to an
// AppendEntries RPC. The given server's disk agrees with this server's log up
// through the given index.
func (c *commitment) match(server ServerID, matchIndex uint64) {
c.Lock()
defer c.Unlock()
if prev, hasVote := c.matchIndexes[server]; hasVote && matchIndex > prev {
c.matchIndexes[server] = matchIndex
c.recalculate()
}
}
注释也基本说明了这个方法的作用,就是我们上面说的,我们就不再重复了,要理解这个逻辑,先了解下这个数据结构matchIndexes,matchIndexes 是一个map,key就是server id,就是consul 集群每个节点有一个id,value就是上次应用log到状态机的编号commitIndex,recalculate的代码如下:
// Internal helper to calculate new commitIndex from matchIndexes.
// Must be called with lock held.
func (c *commitment) recalculate() {
if len(c.matchIndexes) == 0 {
return
}
matched := make([]uint64, 0, len(c.matchIndexes))
for _, idx := range c.matchIndexes {
matched = append(matched, idx)
}
//这个排序是降序,才能保证下面取中间索引位置的值来判断是否过半已经复制成功。
sort.Sort(uint64Slice(matched))
quorumMatchIndex := matched[(len(matched)-1)/2]
//如果超过一半的follower成功了,则开始commit,即应用到状态机
if quorumMatchIndex > c.commitIndex && quorumMatchIndex >= c.startIndex {
c.commitIndex = quorumMatchIndex
//符合条件,触发commit,通知leader执行apply log
asyncNotifyCh(c.commitCh)
}
}
这个recalculate的逻辑单独看有点晦涩,先不急于理解,先举一个例子来说明下recalculate的逻辑:
假如集群三个节点,server id分别为1,2,3,上次写log的编号是3,就是leader和follower都成功了,这个matchIndexes的数据如下:
1(leader) --> 3
2(follower) --> 3
3(follower) --> 3
假如这个时候新来一个put请求,leader本地持久化成功,就要更新这个数据结构了matchIndexes了, 因为leader是先更新,再并发请求follower的,所以这个时候matchIndexes数据如下,因为一个log,所以logIndex是加1。
1(leader) --> 4
2(follower) --> 3
3(follower) --> 3
因为leader本地完成和follower远程完成一样,都要通过这个逻辑来判断是否commit 该log 请求,即是否应用到FSM,所以就是要判断是否过半完成了,逻辑是这样的:
先创建一个数组matched,长度为集群节点数,我们的例子是3,
然后把matchIndexes的commitIndex 起出来,放到matched中,matched的数据就是[4,3,3]
排序,为啥要排序,因为map 是无序的,下面要通过中间索引的值来判断是否变化。
然后计算 quorumMatchIndex := matched[(len(matched)-1)/2],这个就是取中间索引下标的值,也是因为这点,需要第三步排序.
比较quorumMatchIndex 是否大于当前的commitIndex,如果大于,说明满足过半的条件,则更新,然后应用到状态机。
通过上面5步,来实现了一个过半的逻辑,我们再以两个场景来理下,
假如一个follower失败了,一个成功,成功的follower会更新matched的数据是[4,3,4],或者是[4,4,3],排序后为都是[4,4,3], 第4步计算的结果是4大于3,就可以提交了,经过上面的详细,再回看上面的代码就好理解了。
一致性检查:
raft协议日志复制是需要严格保证顺序的,所以在日志复制的时候follower需要对日志做检查,主要有两种情况:
log有gap,比如follower停了一段时间,重新加入集群,这个时候follower的log 编号很多事和leader有差距的,对这种情况,就是日志一致性的保证。
follower之前有其他的leader写了日志,需要覆盖,以新的leader为准。
leader网络出现问题,集群已经有新的leader,老的leader又活过来后,重新发起日志到follower,这个时候任期比会新的leader小,follower直接返回不处理该请求,老的leader 在检查响应时,先判断follower的term是不是比自己大,就停止复制的工作。
每次log append给follower时,follower会把自己当前的logindex 编号和当前leader的任期term返回给leader,leader获取到对应的编号时,会更新发送logNext,也就是从这里开始发生日志给follower,就进入重试的的流程,重新发日志。
这里consul 根据raft协议做了一个优化,raft协议描述的是每次递减一个logindex 编号,来回确认,直到找到follower匹配的编号,再开始发日志,这样性能就很差,所有基本上没有那个分布式系统是那样实现落地的。
Commit Log
只要超过一半的日志 复制成功,consul 就进入日志commit阶段,也就是将修改应用到状态机,通过recalculate 方法给leader监听的commitCh 发一个消息,通知leader开始执行apply log 到FSM, leader 的代码如下:
case <-r.leaderState.commitCh:
// Process the newly committed entries
//上次执行commit log index
oldCommitIndex := r.getCommitIndex()
//新的log需要commit的log index,在判断是过半时,会更新commitindex
commitIndex := r.leaderState.commitment.getCommitIndex()
r.setCommitIndex(commitIndex)
....
start := time.Now()
var groupReady []*list.Element
var groupFutures = make(map[uint64]*logFuture)
var lastIdxInGroup uint64
// Pull all inflight logs that are committed off the queue.
for e := r.leaderState.inflight.Front(); e != nil; e = e.Next() {
commitLog := e.Value.(*logFuture)
idx := commitLog.log.Index
//idx 大于commitIndex,说明是后面新写入的,还没有同步到follower的日志。
if idx > commitIndex {
// Don't go past the committed index
break
}
// Measure the commit time
metrics.MeasureSince([]string{"raft", "commitTime"}, commitLog.dispatch)
groupReady = append(groupReady, e)
groupFutures[idx] = commitLog
lastIdxInGroup = idx
}
// Process the group
if len(groupReady) != 0 {
//应用的逻辑在这里。groupFutures 就是写入go routine wait的future
r.processLogs(lastIdxInGroup, groupFutures)
//清理inflight集合中已经commit过的log,防止重复commit
for _, e := range groupReady {
r.leaderState.inflight.Remove(e)
}
}
这里比较简单,就是从leaderState.inflight 中取出log,就是我们之前写入的,循环判断,如果log的编号大于commitIndex,说明是后面新写入的log,还没有同步到follower的log,不能提交。这里应该是有序的,lastIdxInGroup 应该就是需要commit的log的最大的一个编号。
processLogs的逻辑就是支持分批提交支持,发给consul 的runFSM的go routine,consul raft专门有一个go routine来负责commit log到状态机,支持批量和一个一个commit,我们看下单个commit的情况,代码如下:
commitSingle := func(req *commitTuple) {
// Apply the log if a command or config change
var resp interface{}
// Make sure we send a response
defer func() {
// Invoke the future if given
if req.future != nil {
req.future.response = resp
req.future.respond(nil)
}
}()
switch req.log.Type {
case LogCommand:
start := time.Now()
//将日志应用到FSM的关键在这里。
resp = r.fsm.Apply(req.log)
metrics.MeasureSince([]string{"raft", "fsm", "apply"}, start)
....
}
// Update the indexes
lastIndex = req.log.Index
lastTerm = req.log.Term
}
主要就是三点,应用log 到fsm,然后跟新下fsm的logindex和任期,最后就是要通知还在wait的go routine。
整个日志复制的流程很长,最后再上一张图总结下整个过程:
总结
这篇文章主要基于consul 目前版本的实现和基于个人的理解,以consul 一个写请求的整个过程为线索,介绍了consul 基于raft协议的实现日志复制的基本过程,重点介绍了日志顺序保证措施,日志一致性检查,过半提交log,心跳机制的实现原理,以及相关的几个优化措施,比如大请求分拆,分组批量发送等一些实践优化措施,如有不正确的地方,欢迎交流和指正,目前正在看机会中。