JRaft源码剖析3-日志复制

1.日志复制原理

Raft 中的日志主要可以分为两类:

  • 一类是协议自身运行所生成的日志,例如集群节点配置变更信息;
  • 另外一类就是用户向集群提交的指令所生成的日志。

为了让集群中的各个节点达成共识,Leader 节点需要将日志数据复制给集群中的各个节点,并采用投票机制让这些节点决定是否许可日志对应的操作。对于被许可的操作日志,各个节点会严格按照相同的顺序在本地进行存储,并重放日志对应的操作,以此实现节点之间的共识。

JRaft 在设计和实现层面为每个 Follower 和 Learner 节点都绑定了一个复制器 Replicator 实例,由 Replicator 负责向目标节点复制日志数据,Replicator 实例之间彼此相互隔离,互不影响,并由 ReplicatorGroup 进行统一管理。日志复制需要涉及到集群中节点之间的频繁通信和数据传输,所以需要保证复制操作的高性能,并且不允许出现乱序和断层。为此,JRaft 引入了多种优化策略,包括:Follower 节点之间并发复制、批量发送,以及 Pipeline 机制等。

日志复制从广义层面而言除了复制单条的 LogEntry 外,还包含向目标节点复制快照数据。

2.日志生成

JRaft 提供了 Node#apply 交互接口以让业务向 JRaft 集群提交操作指令,这些指令以 Task 的形式在集群中流转,并以日志的形式记录到 Leader 节点中,同时同步给集群中所有的 Follower 节点,并最终透传给所有成功完成日志复制的集群节点状态机。

NodeImpl#apply将Task 封装成 LogEntry 对象,并以事件的形式投递给 Disruptor 队列进行异步处理,用户可以通过 Task 的 Task#done 字段感知任务被状态机处理的响应状态。LogEntryAndClosureHandler 实现了 EventHandler 接口,用于消费 Disruptor 队列中的事件。

NodeImpl.LogEntryAndClosureHandler
-> NodeImpl#executeApplyingTasks

  • 1)只有Leader节点才允许处理Task
  • 2)遍历处理每个Task。如果 Task 期望校验 term 值,则校验当前节点的 term 值是否是期望的 term 值;为每个 task 创建并初始化对应的选票,用于决策对应的 LogEntry 是否允许被提交(this.ballotBox.appendPendingTask)。
     A) 创建并初始化选票Ballot
     B) 节点成为 Leader 之后必须调用 BallotBox#resetPendingIndex 方法重置 pendingIndex 值
     C) 记录选票,用于检查是否赢得过半数选票this.pendingMetaQueue.add(bl)
     D) 记录 Task 的回调 done 对象,当对应的日志被 committed 时触发执行this.closureQueue.appendPendingClosure(done)
  • 3)追加日志数据到本地文件系统,完成之后回调 LeaderStableClosure。(this.logManager.appendEntries)
    LogManagerImpl#appendEntries
     A)对于 Leader 节点而言,基于本地 lastLogIndex 值设置各个 LogEntry 的 logIndex
     B)对于 Follower 节点而言(LogManagerImpl#checkAndResolveConflict)
      B-1)待写入的日志数据与本地已有的日志数据存在断层,此时只能返回错误。
      B-2)待写入的日志数据相对于本地已有的日志数据更老,即最大的 logIndex 小于等于本地已经写入的日志数据的 logIndex,直接忽略。
      B-3)待写入的日志数据与本地已有的日志数据正好衔接上,直接递增 lastLogIndex 即可。
      B-4)待写入的日志数据与本地已有的日志数据存在重叠,此时需要判断是否存在冲突,并强行覆盖本地存在冲突的数据。
     C)完成对于冲突数据的处理,LogManager 会先将日志数据写入内存
     D)并将日志数据以 OTHER 类型事件的形式提交给 Disruptor 队列,用于实现异步刷盘。
      D-1)StableClosureEventHandler 处理,其定义了一个 AppendBatcher 类型的字段,用于缓存待写入的数据。
      D-2)AppendBatcher#flush 用于执行将缓存的数据写入存储系统
      D-3)LogManagerImpl#appendToStorage 实现了将数据写入存储系统的逻辑,默认也就是写入 RocksDB 存储引擎
     E)LogManagerImpl#wakeupAllWaiter 用于通知那些等待新数据到达的复制器 Replicator 实例,这些 Replicator 在向目标 Follower 或 Learner 节点复制日志数据时可能出现没有数据可以复制的情况,此时这些复制器 Replicator 会注册一个回调监听新的数据到来,而通知这些监听器的时机则位于此
    NodeImpl.LeaderStableClosure#run
     A)如果响应状态是 OK,则上述回调会执行 BallotBox#commitAt 方法检查该批次中的日志数据是否被过半数的节点所成功复制,如果存在复制成功的日志数据,则递增 lastCommittedIndex 值,并向状态机发布 COMMITTED 事件。
     B)BallotBox#commitAt(方法 BallotBox#commitAt 除了会被 Leader 节点调用,也会在 Follower 节点完成日志数据复制的 AppendEntries 请求响应处理期间被调用,此时也会触发检查 granted 操作)
      B-1)遍历检查当前批次中的 LogEntry 是否有成功被过半数节点复制的,如果当前 LogEntry 被过半数节点成功复制,记录 lastCommittedIndex
      B-2)剔除已经被过半数节点复制的 LogIndex 对应的选票,Raft 保证一个 LogEntry 被提交之后,在此之前的 LogEntry 一定是 committed 状态。更新集群的 lastCommittedIndex 值
      B-3)向状态机发布 COMMITTED 事件(业务向 JRaft 集群提交的 Task 在被转换成日志并成功复制给集群中的过半数以上节点(即对应的日志被提交)之后,接下去就需要将这些日志中存储的指令透传给业务状态机)
       B-3-1)FSMCaller#onCommitted -> FSMCallerImpl#doCommitted
       B-3-2)获取最新被状态机应用的 LogEntry 对应的 logIndex 值lastAppliedIndex
       B-3-3)当前 committedIndex 对应的 LogEntry 已经被处理过,无需重复处理
       B-3-4)获取 committedIndex 之前的 Task 的回调列表,填充到 closures 集合中,如果是 TaskClosure 类型,则顺便记录到 taskClosures 中,主要是为了回调 TaskClosure#onCommitted 方法。对于 TaskClosure 类型的 Task 回调,应用 TaskClosure#onCommitted 方法。
       B-3-5)迭代处理 LogEntry,连续处理一批业务操作产生的日志,应用 StateMachine#onApply 方法
       B-3-6)更新最新应用的日志对应的 logIndex (lastAppliedIndex)和 term 值,通知 LogManager,这些已经被应用的 LogEntry 可以从内存中移除了

FSMCaller 本地维护了一个 lastAppliedIndex 字段,用于记录已经被应用(即已将日志中的指令透传给业务状态机)的 LogEntry 对应的 logIndex 值。

在调用 Node#apply 方法向 JRaft 集群提交 Task 时,一般都会给 Task 设置一个回调,即给 Task#done 字段赋值。所以,FSMCaller 对于给定的 committedIndex,首先会调用 ClosureQueueImpl#popClosureUntil 方法获取到这些已经被提交的 LogEntry 对应的 Task 的回调。这些回调最终会透传给业务状态机,由业务决定是响应成功还是失败。

FSMCaller 和 BallotBox 所持有的 ClosureQueue 实例是同一个吗?这些 Task 回调正是在前面调用 BallotBox#appendPendingTask 方法时记录的。

3.日志复制

在将用户操作指令封装成 LogEntry 写入内存之后,日志复制的进程即开始了,与此同时,Leader 节点会以异步的方式将数据落盘。日志复制仍然采用投票机制,当一条日志被集群中过半数以上的节点成功复制之后,这条日志会被打上 committed 标签。此类日志中承载的操作指令最后会被透传给状态机,由业务负责执行。

Leader 节点针对每个 Follower 节点都会在本地为其创建并启动一个复制器 Replicator 实例,而日志复制的过程则全权由 Replicator 负责,各 Replicator 之间相互独立,彼此互不影响。

3.1 pipeline机制

Leader 节点将日志数据复制给 Follower 节点的过程必须保证日志数据的顺序性和连续性。为了达到此目的,最简单的交互模式就是“request -> response -> request”,即每次发送出去一个请求之后必须等待接收并处理完对应的响应之后再发送下一个请求,从交互上保证日志复制的严格串行化。这一设计的优点在于实现简单,但是性能上却不尽如人意。

日志数据复制在 Raft 算法的运行过程中是一项频繁的操作,为了在保证日志复制顺序和连续的前提下尽量提升复制的性能,除了并发的向各个 Follower 或 Learner 节点批量发送数据之外,JRaft 在实现上还引入了 pipeline 机制。这一机制简单而言就是将请求和响应从串行改为并行,请求和响应彼此之间互不阻塞。Leader 节点可以连续的向 Follower 节点发送请求,对于那些已经发送出去还未收到响应的请求,或者已经收到但是还没来得及处理的响应对应的请求将其标记为 inflight,并在成功处理完对应的响应之后去除请求的 inflight 标记。如果期间发生错误或者 Leader 节点宕机,对于这些 inflight 请求会尝试重新发送,以此保证日志数据在复制期间不会漏传给 Follower 节点。Pipeline 机制与 TCP 协议中的滑动窗口算法思想相通,是分布式系统中提升吞吐量的惯用策略,例如 Kafka 生产者在往服务端发送消息时同样采用了类似的机制。

这一机制可能会导致同一个 LogEntry 被多次复制给 Follower 节点,好在 Raft 算法要求日志中的指令必须是幂等的,同时 Raft 算法针对日志数据的冲突解决机制能够保证重复复制的 LogEntry 能够被最后一次复制的 LogEntry 所覆盖。

Replicator 在成功发送一个 RPC 请求之后会调用 Replicator#addInflight 方法将请求相关的信息封装成 Inflight 对象记录到 inflight 队列中。

当接收到请求对应的响应时,Replicator 会执行 Replicator#onRpcReturned 方法处理响应。

  • 1)因为 inflight 请求本质上是一种未完成的请求,有重试的可能,所以当重新发送请求时,之前请求对应的响应即使收到了也应该被忽略。(JRaft 在实现上通过版本策略予以实现。Replicator 定义了一个 Replicator#version 字段,用于标识当前 inflight 队列的版本。当重置 inflight 队列时会自增该版本号,并清空 inflight 队列和响应队列等。)
  • 2)响应的顺序是未知的,但是需要保证处理的顺序,所以对于提前收到的响应需要先缓存起来,必须按照请求发送的顺序而非响应到达的顺序进行处理。
  • 3)需要保证请求序列和响应序列相匹配。

3.2 建立到Follower的连接

ReplicatorGroup#addReplicator建立到各个 Follower 节点之间的复制关系。核心在于调用 Replicator#start 方法创建并启动到目标节点的复制器 Replicator 实例。

Replicator#start

  • 1)创建复制器 Replicator 对象
  • 2)检查到目标节点的连通性
  • 3)布 CREATED 事件
  • 4)启动心跳超时计时器
  • 5)发送探针请求,以获取接下去发往目标节点的正确 logIndex 位置,并启动日志复制进程(Replicator#sendEmptyEntries)

Leader 节点在通过复制器 Replicator 与目标 Follower 节点建立连接后,需要发送一个探针请求,目的是获取 Follower 节点已经拥有的日志位置,以便于接下去向 Follower 节点发送后续的日志数据。方法 Replicator#sendEmptyEntries 接收两个参数,当发送探针请求时会设置参数 isHeartbeat = false,同时设置参数 heartBeatClosure = null。

Replicator#sendEmptyEntries

  • 1)构建 AppendEntries 请求。为 AppendEntries 请求填充基础参数,包括当前节点的 term 值、groupId、节点 ID,以及 committedLogIndex 等等。 如果返回 false 说明待发送的部分日志已经变为快照,需要先给目标节点安装快照。
  • 2)isHeartbeat为true,处理心跳请求。
  • 3)isHeartbeat为false,处理探针请求。向目标节点发送 AppendEntries 请求。
     3-1)NodeImpl#handleAppendEntriesRequest目标Follower节点处理请求。通过这一系列的校验过程,Follower 节点会在针对当前探针请求的响应中附上本地已知的最新 logIndex 和 term 值。
      A)如果当前节点处于非活跃状态,则响应错误;
      B)否则,解析请求来源节点的节点 ID,如果解析失败则响应错误;
      C)否则,校验请求中的 term 值是否小于当前节点,如果是则拒绝请求;
      D)否则,基于请求和当前节点本地状态判断是否需要执行 stepdown 操作;
      E)判断请求来源节点是否是当前节点所认可的 Leader 节点,如果不是则说明可能出现网络分区,尝试将响应中的 term 值加 1,以触发请求节点执行 stepdown 操作;
      F)否则,更新本地记录的最近一次收到来自 Leader 节点的时间戳;
      G)校验最近一次完成复制的 LogEntry 对应的 term 值是否与本地相匹配,如果不匹配则拒绝请求,并返回本地已知的最新 logIndex 值;
      H)否则,依据请求中的 committedIndex 值更新本地的 committedIndex 值,同时响应请求,返回本地已知的最新 logIndex 和 term 值。
     3-2)Leader 节点对于上述 Follower 节点响应的处理过程Replicator#onAppendEntriesReturned
      A)校验 inflight 请求与响应中记录的请求对应的已经完成复制的 logIndex 是否一致,如果不是则需要重置 inflight 队列,并重新发送探针请求;
      B)否则,如果目标 Follower 节点运行异常,则同样需要重置 inflight 队列,并重新发送探针请求;
      C)否则,说明目标 Follower 节点运行正常,但是目标节点可以同意当前请求,也可以拒绝当前请求,需要分别处理。
      C-1)如果目标 Follower 节点拒绝当前请求 ,按照之前对于 Follower 节点处理 AppendEntries 探针请求过程的分析可知可能包含以下原因:
       C-1-1)Follower 节点本地的 term 值相对于当前 Leader 节点更大。
       C-1-2)Follower 节点本地记录的 Leader 节点 ID 并不是当前 Leader 节点,即可能出现网络分区。
       C-1-3)Follower 节点与当前 Leader 节点的日志数据存在冲突。
       针对原因 1 和 2,说明集群中已经有更新的 Leader 节点,此时当前节点需要销毁对应的复制器 Replicator 实例,并执行 stepdown 操作。
       针对原因 3 需要分为两类情况:
       原因1)如果目标 Follower 节点本地最新的 logIndex 相对于当前复制器 Replicator 记录的 nextIndex 要小,则需要修正 nextIndex 之后重新发送探针请求。
       原因2)如果目标 Follower 节点本地最新的 logIndex 相对于当前复制器 Replicator 记录的 nextIndex 相等或更大,说明目标 Follower 节点包含老的 Leader 节点复制的日志,此时需要递减 nextIndex 值并重新发送探针请求,以解决日志冲突问题。
      C-2)如果目标 Follower 节点同意当前请求 ,则说明 Follower 节点确认当前复制器 Replicator 实例记录的 nextIndex 值是正确的,无需修正 nextIndex 值,接下去可以继续执行往目标 Follower 节点复制日志的操作。
  • 4)将当前请求标记为 inflight,并记录到 inflight 队列中

3.3 复制日志

如果刚刚分析的 Replicator#onAppendEntriesReturned 方法处理探针请求对应的响应正常,即返回 true,那么接下去就会触发日志复制的进程,即调用 Replicator#sendEntries 方法开始往目标 Follower 节点复制日志数据。

Replicator#sendEntries

  • 1)获取下一个待发送的 LogEntry 对应的 logIndex 值,如果返回 -1 表示暂停复制(太多 inflight 请求,暂停发送新的 AppendEntries 请求)。
  • 2)向目标节点复制 nextSendingIndex 位置之后的 LogEntry 数据。Replicator#sendEntries 的重载版本 Replicator#sendEntries(long) 用于从指定 logIndex 位置开始从本地获取对应的 LogEntry 数据并复制给目标 Follower 节点,整体过程与前面介绍的发送探针请求 Replicator#sendEmptyEntries 方法基本类似。
  • 3)Follower 节点对于复制日志数据 AppendEntries 请求的处理过程,位于 NodeImpl#handleAppendEntriesRequest 方法中。针对复制日志数据的 AppendEntries 请求,Follower 节点会基于请求中的 LogEntry 元数据和数据体信息逐一解析构造对应的 LogEntry 对象,并调用 LogManager#appendEntries 方法批量的将日志数据写入本地存储系统。日志数据写入磁盘是一个异步的过程,当日志数据成功在 Follower 节点落盘之后,Follower 节点会向 Leader 节点发送 AppendEntries 响应。
  • 4)Leader 节点针对复制日志数据的 AppendEntries 请求响应的处理过程。Replicator#onAppendEntriesReturned。如果是针对复制日志数据的 AppendEntries 请求的响应,如果响应来自 Follower 节点则会触发执行 BallotBox#commitAt 方法以检查当前批次的日志数据是否能够被提交,即是否有超过半数的节点完成了对于该批次日志数据的复制操作,如果是则会触发 Leader 节点将该批次的日志数据标记为 committed。

JRaft 在 AppendEntries 请求的设计上采用了将 LogEntry 的元数据和数据体相分离的策略,所以上述从本地加载日志数据的过程会先填充元数据,再填充数据体。实现上使用 RecyclableByteBufferList 作为数据体的载体,RecyclableByteBufferList 可以看做是一个可回收利用的 ByteBuffer 链表,实现层面借鉴了 Netty 的轻量级对象池的设计思想。

如果从本地获取不到 logIndex 对应的日志数据,那么可能存在两种原因:

  • 需要复制的数据已经变为快照形式存储。
  • 没有可以复制的数据。

针对第一种情况直接给目标 Follower 节点安装快照即可,针对第二种情况则立即退出当前 Replicator#sendEntries 方法,并设置一个回调等待新的日志数据。如果本地最新的 logIndex 超过该期望值则说明有新的日志数据可以被复制,会触发执行 Replicator#continueSending 操作。

3.4 心跳机制

复制器 Replicator 中的字段 Replicator#lastRpcSendTimestamp 用于记录最近一次成功向目标 Follower 节点发送 RPC 请求的时间戳。探针请求和复制日志数据请求都会在处理请求响应时更新该字段,不过仅靠这两类请求触发时间戳更新显然是不够的,毕竟整个 JRaft 集群不会始终处于频繁的日志数据复制状态。为此,JRaft 还在复制器 Replicator 中实现了一套心跳机制。

Replicator#start执行 Replicator#startHeartbeatTimer 方法启动心跳计时器,该计时器会延迟指定时间(默认为 100ms)执行 Replicator#onTimeout 操作给当前复制器添加一个 ETIMEDOUT 事件。Replicator 对于此类事件的处理逻辑就是调用 Replicator#sendHeartbeat 方法向目标 Follower 节点发送心跳请求。

心跳请求本质上还是一个空的 AppendEntries 请求,核心是方法 Replicator#sendEmptyEntries。

Follower 节点对于心跳请求的处理逻辑与探针请求一致。默认的心跳请求响应回调逻辑,由 Replicator#onHeartbeatReturned 方法实现。

如果目标 Follower 节点运行异常,则不应该更新复制器 Replicator 的 Replicator#lastRpcSendTimestamp 字段,这无可厚非。如果目标 Follower 节点运行正常,但是拒绝当前的心跳请求,按照之前的总结分为以下三种原因:

  • 1)Follower 节点本地的 term 值相对于当前 Leader 节点更大。
  • 2)Follower 节点本地记录的 Leader 节点 ID 并不是当前 Leader 节点,即可能出现网络分区。
  • 3)Follower 节点与当前 Leader 节点的日志数据存在冲突。

其中只有第三种情况会在响应中携带 Follower 节点的最新 logIndex 值,此时心跳请求会触发向目标 Follower 节点发送探针请求,并在探针请求响应中更新 RPC 发送时间戳。然而,不管是上述哪种原因导致 Follower 节点拒绝响应,亦或是同意响应,复制器 Replicator 都会再次调用 Replicator#startHeartbeatTimer 方法进入下一轮心跳进程。

参考

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,125评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,293评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,054评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,077评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,096评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,062评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,988评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,817评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,266评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,486评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,646评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,375评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,974评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,621评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,642评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,538评论 2 352

推荐阅读更多精彩内容