1.RpcServer
添加系统自带的processors。
RaftRpcServerFactory#addRaftRequestProcessors()
AppendEntriesRequestProcessor
GetFileRequestProcessor
InstallSnapshotRequestProcessor
RequestVoteRequestProcessor
PingRequestProcessor
TimeoutNowRequestProcessor
ReadIndexRequestProcessor
AddPeerRequestProcessor
RemovePeerRequestProcessor
ResetPeerRequestProcessor
ChangePeersRequestProcessor
GetLeaderRequestProcessor
SnapshotRequestProcessor
TransferLeaderRequestProcessor
GetPeersRequestProcessor
AddLearnersRequestProcessor
RemoveLearnersRequestProcessor
ResetLearnersRequestProcessor
另外可以注册自定义的业务processor。
2.启动一个 JRaft 节点需要指定的核心参数
- 数据存储根路径,用于存储日志、元数据,以及快照数据。
- 组 ID,一个组可以看做是一个独立的 Raft 集群,JRaft 支持 MULTI-RAFT-GROUP。
- 节点地址,即当前节点的 IP 和端口号。
- 初始集群节点列表,即初始构成 JRaft 集群的节点列表。
3.初始化和启动
this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions, rpcServer);
// start raft node
this.node = this.raftGroupService.start();
3.1 NodeImpl#init
创建NodeImpl,并调用NodeImpl#init()方法。
public static Node createAndInitRaftNode(final String groupId, final PeerId serverId, final NodeOptions opts) {
final Node ret = createRaftNode(groupId, serverId);
if (!ret.init(opts)) {
throw new IllegalStateException("Fail to init node, please see the logs to find the reason.");
}
return ret;
}
NodeImpl#init()是个非常重要的方法,创建了很多核心组件:
- 创建并初始化延时任务调度器 TimerManager,主要用于处理内部的延时任务(与周期性任务相区分)。
- 创建计时器,用于执行周期性任务,包括:预选举计时器(electionTimer)、正式选举计时器(voteTimer)、角色降级计时器(stepDownTimer),以及快照周期性生成计时器(snapshotTimer)。
- 创建集群节点配置管理器 ConfigurationManager,并初始化集群节点配置信息。
- 初始化 Task 处理相关的 disruptor 队列,用于异步处理业务调用 Node#apply 方法向集群提交的 Task 列表。
- 初始化日志数据存储模块,并对日志数据执行一致性校验。
- 初始化元数据存储模块。
- 初始化快照数据存储模块。
- 创建并初始化状态机调度器 FSMCaller。
- 创建并初始化选票箱 BallotBox。
- 创建并初始化复制器管理组 ReplicatorGroup。
- 创建并初始化 RPC 客户端 RaftClientService。
- 创建并初始化只读服务 ReadOnlyService,用于支持线性一致性读。
- 如果启用了快照生成机制,则启动周期性快照生成任务。
- 如果初始集群节点不为空,则尝试执行角色降级(stepdown),以对本地状态进行初始化,并启动预选举计时器。
- 如果集群只有当前这一个节点,则尝试选举自己为 Leader。
3.1.1 周期任务调度器RepeatedTimer
对于Timer 接口,JRaft 提供了 DefaultTimer 和 HashedWheelTimer 两个实现类,其中前者基于 JDK 内置的 ScheduledExecutorService 实现,后者则基于单层时间轮算法实现。相对而言,HashedWheelTimer 较 DefaultTimer 在性能和精度层面表现更优,所以 JRaft 将其作为默认 Timer 应用于 RepeatedTimer 中。
RepeatedTimer#start
-> RepeatedTimer#schedule
-> 创建TimerTask,提交给timer(默认是HashedWheelTimer)执行,这里会调用 adjustTimeout 方法,用于调整计时周期(避免一直无法选出Leader)
-> RepeatedTimer#run
-> 会调用业务逻辑onTrigger(),如果本次任务调度完成,重新发起调度下一轮任务。计时器停止时回调onDestroy方法。
3.1.2 数据存储
JRaft 的数据存储层主要包含对三类数据的存储:日志数据、元数据,以及快照数据。
- 日志数据存储的 LogEntry 数据,包含系统内部运行产生的日志,以及业务向集群提交 Task 所生成的日志,日志数据默认采用 RocksDB 进行存储;
- 元数据用于记录当前节点的 currentTerm 值,以及投票 votedFor 信息;
- 快照数据是对日志数据存储的一种优化手段,用于将那些已经被应用的日志进行压缩存储,以节省磁盘空间占用,同时缩短新接入节点同步集群数据的时间。
1)日志数据存储
NodeImpl#initLogStorage
-> 创建logStorage,基于 RocksDBLogStorage 实现类
-> 创建logManager,实现类:LogManagerImpl
-> LogManagerImpl#init
LogManagerImpl#init
-> this.logStorage.init()初始化日志存储服务
-> 基于日志初始化本地 logIndex 和 term 值
-> 创建对应的 Disruptor 队列,用于异步处理日志操作相关的事件,包括获取最新的 LogId、日志截断、重置日志数据存储服务,以及关闭日志管理器等。方法 LogManagerImpl#offerEvent 定义了往该 Disruptor 消息队列发送消息的逻辑,而具体处理消息的逻辑则有 StableClosureEventHandler 类实现。
RocksDBLogStorage#init
-> RocksDBLogStorage#initAndLoad 打开本地存储引擎 RocksDB,并从本地 conf 日志中恢复集群节点配置和 firstLogIndex 数据
-> RocksDBLogStorage#openDB,打开 RocksDB,并初始化对应的 ColumnFamily。两个ColumnFamily(名为 Configuration 的 ColumnFamily 用于存储集群节点配置相关的 LogEntry 实例;默认的 ColumnFamily 除了包含 Configuration 中的数据之外,还用于存储用户数据相关的 LogEntry 实例)
-> RocksDBLogStorage#load,从 conf 中加载集群节点配置,以及 firstLogIndex 值,并从本地剔除 firstLogIndex 之前的 conf 和 data 数据
另外,在NodeImpl#init会调用LogManagerImpl#checkConsistency方法对日志数据进行一致性校验。
LogManagerImpl#checkConsistency
-> 校验的逻辑主要是确保快照数据与当前数据的连续性,不允许存在数据断层。
2)元数据存储
元数据包括 currentTerm 值和当前节点的 votedId 信息。LocalRaftMetaStorage 实现类,基于本地文件系统采用 protobuf 协议对元数据执行序列化之后进行存储。LocalRaftMetaStorage 在初始化时(即执行 LocalRaftMetaStorage#init 方法期间)会从本地文件系统加载并反序列化元数据,以初始化 currentTerm 和 votedFor 属性值。运行期间对于这两个属性值的更改全部记录在内存中,并在关闭时(即执行 LocalRaftMetaStorage#shutdown 方法期间)将内存中的数据序列化后落盘。
NodeImpl#initMetaStorage
-> 实例化元数据存储服务,基于 LocalRaftMetaStorage 实现类
-> this.metaStorage.init()
-> 基于本地元数据恢复 currentTerm 和 votedId 属性值
3)快照数据存储
与日志数据存储模块的设计相类似,JRaft 针对快照数据存储模块同样采用了操作与存储相分离的策略,其中 SnapshotExecutor 主要负责生成和安装快照,而 SnapshotStorage 则主要负责针对快照文件的读写,以及从远端 Leader 节点拷贝快照数据。
NodeImpl#initSnapshotStorage
-> 创建和初始化SnapshotExecutorImpl
-> SnapshotExecutorImpl#init
SnapshotExecutor 的初始化过程主要是从本地加载最新的快照文件数据。
SnapshotExecutorImpl#init
-> 创建快照存储服务,基于 LocalSnapshotStorage 实现类
-> 初始化快照存储服务,主要工作是从本地删除除最后一次快照所生成的快照文件之外的其它快照数据文件
-> 打开快照文件读取器,加载快照元数据信息,加载最近一次的快照数据:FSMCaller#onSnapshotLoad 方法的实现
3.1.3 StateMachine和FSMCaller
JRaft 通过StateMachine接口抽象描述了 Raft 算法中引入的状态机。这也是 JRaft 向业务透传自己运行状态的核心接口,业务可以通过该接口捕获 JRaft 的运行事件。除了最核心的应用 LogEntry 中的指令外,还包括当前节点作为 LEADER 或 FOLLOWER 角色的启停事件、集群节点配置变更、快照加载与存储,以及集群运行错误与停机等。
JRaft 是如何将这些事件通知到业务的呢?具体点来说,通知到业务实现的状态机的呢?这就是状态机调度器 FSMCaller 所做的工作。
JRaft 通过调用 FSMCaller 中声明的方法实现将内部运行状态透传给业务,而 FSMCaller 在本地则基于 Disruptor 消息队列以事件的形式缓存这些内部状态,并通过异步的方式回调 StateMachine 接口声明的相应方法,这就是 FSMCaller 整体的运行逻辑。
NodeImpl#initFSMCaller
-> this.closureQueue = new ClosureQueueImpl(this.groupId),创建封装 Closure 的队列,基于 LinkedList 实现
-> FSMCallerImpl#init,创建和启动了一个 Disruptor 队列,用于异步处理各种状态机事件
FSMCallerImpl#enqueueTask 用于往该 Disruptor 队列写入具体的状态机事件,而 FSMCallerImpl 实现的FSMCaller 接口中声明的方法,基本上都是简单的调用了该方法。
ApplyTaskHandler处理消息。ApplyTaskHandler 通过调用 FSMCallerImpl#runApplyTask 方法对 Disruptor 消息队列中缓存的状态机事件进行处理。该方法本质上是一个事件分发器,基于具体的状态机事件类型调用对应的 do* 方法实现对事件的处理操作。
3.1.4 选票箱BallotBox
投票机制是 Raft 算法运行的基础,JRaft 在实现上为每个节点都设置了一个选票箱 BallotBox 实例,用于对 LogEntry 是否提交进行仲裁。
BallotBox 中持有的 ClosureQueue 实例是在前面介绍的 NodeImpl#initFSMCaller 中创建的,所以 FSMCaller 和 BallotBox 对象持有的 ClosureQueue 实例是同一个。BallotBox 负责往 ClosureQueue 中写数据,而 FSMCaller 则负责从 ClosureQueue 中读数据。