JRaft源码剖析1-节点启动过程

1.RpcServer

添加系统自带的processors。
RaftRpcServerFactory#addRaftRequestProcessors()

AppendEntriesRequestProcessor
GetFileRequestProcessor
InstallSnapshotRequestProcessor
RequestVoteRequestProcessor
PingRequestProcessor
TimeoutNowRequestProcessor
ReadIndexRequestProcessor

AddPeerRequestProcessor
RemovePeerRequestProcessor
ResetPeerRequestProcessor
ChangePeersRequestProcessor
GetLeaderRequestProcessor
SnapshotRequestProcessor
TransferLeaderRequestProcessor
GetPeersRequestProcessor
AddLearnersRequestProcessor
RemoveLearnersRequestProcessor
ResetLearnersRequestProcessor

另外可以注册自定义的业务processor。

2.启动一个 JRaft 节点需要指定的核心参数

  • 数据存储根路径,用于存储日志、元数据,以及快照数据。
  • 组 ID,一个组可以看做是一个独立的 Raft 集群,JRaft 支持 MULTI-RAFT-GROUP。
  • 节点地址,即当前节点的 IP 和端口号。
  • 初始集群节点列表,即初始构成 JRaft 集群的节点列表。

3.初始化和启动

            this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions, rpcServer);
            // start raft node
            this.node = this.raftGroupService.start();

3.1 NodeImpl#init

创建NodeImpl,并调用NodeImpl#init()方法。

    public static Node createAndInitRaftNode(final String groupId, final PeerId serverId, final NodeOptions opts) {
        final Node ret = createRaftNode(groupId, serverId);
        if (!ret.init(opts)) {
            throw new IllegalStateException("Fail to init node, please see the logs to find the reason.");
        }
        return ret;
    }

NodeImpl#init()是个非常重要的方法,创建了很多核心组件:

  • 创建并初始化延时任务调度器 TimerManager,主要用于处理内部的延时任务(与周期性任务相区分)。
  • 创建计时器,用于执行周期性任务,包括:预选举计时器(electionTimer)、正式选举计时器(voteTimer)、角色降级计时器(stepDownTimer),以及快照周期性生成计时器(snapshotTimer)。
  • 创建集群节点配置管理器 ConfigurationManager,并初始化集群节点配置信息。
  • 初始化 Task 处理相关的 disruptor 队列,用于异步处理业务调用 Node#apply 方法向集群提交的 Task 列表。
  • 初始化日志数据存储模块,并对日志数据执行一致性校验。
  • 初始化元数据存储模块。
  • 初始化快照数据存储模块。
  • 创建并初始化状态机调度器 FSMCaller。
  • 创建并初始化选票箱 BallotBox。
  • 创建并初始化复制器管理组 ReplicatorGroup。
  • 创建并初始化 RPC 客户端 RaftClientService。
  • 创建并初始化只读服务 ReadOnlyService,用于支持线性一致性读。
  • 如果启用了快照生成机制,则启动周期性快照生成任务。
  • 如果初始集群节点不为空,则尝试执行角色降级(stepdown),以对本地状态进行初始化,并启动预选举计时器。
  • 如果集群只有当前这一个节点,则尝试选举自己为 Leader。

3.1.1 周期任务调度器RepeatedTimer

对于Timer 接口,JRaft 提供了 DefaultTimer 和 HashedWheelTimer 两个实现类,其中前者基于 JDK 内置的 ScheduledExecutorService 实现,后者则基于单层时间轮算法实现。相对而言,HashedWheelTimer 较 DefaultTimer 在性能和精度层面表现更优,所以 JRaft 将其作为默认 Timer 应用于 RepeatedTimer 中。

RepeatedTimer#start
-> RepeatedTimer#schedule
-> 创建TimerTask,提交给timer(默认是HashedWheelTimer)执行,这里会调用 adjustTimeout 方法,用于调整计时周期(避免一直无法选出Leader)
-> RepeatedTimer#run
-> 会调用业务逻辑onTrigger(),如果本次任务调度完成,重新发起调度下一轮任务。计时器停止时回调onDestroy方法。

3.1.2 数据存储

JRaft 的数据存储层主要包含对三类数据的存储:日志数据、元数据,以及快照数据。

  • 日志数据存储的 LogEntry 数据,包含系统内部运行产生的日志,以及业务向集群提交 Task 所生成的日志,日志数据默认采用 RocksDB 进行存储;
  • 元数据用于记录当前节点的 currentTerm 值,以及投票 votedFor 信息;
  • 快照数据是对日志数据存储的一种优化手段,用于将那些已经被应用的日志进行压缩存储,以节省磁盘空间占用,同时缩短新接入节点同步集群数据的时间。

1)日志数据存储

NodeImpl#initLogStorage
-> 创建logStorage,基于 RocksDBLogStorage 实现类
-> 创建logManager,实现类:LogManagerImpl
-> LogManagerImpl#init

LogManagerImpl#init
-> this.logStorage.init()初始化日志存储服务
-> 基于日志初始化本地 logIndex 和 term 值
-> 创建对应的 Disruptor 队列,用于异步处理日志操作相关的事件,包括获取最新的 LogId、日志截断、重置日志数据存储服务,以及关闭日志管理器等。方法 LogManagerImpl#offerEvent 定义了往该 Disruptor 消息队列发送消息的逻辑,而具体处理消息的逻辑则有 StableClosureEventHandler 类实现。

RocksDBLogStorage#init
-> RocksDBLogStorage#initAndLoad 打开本地存储引擎 RocksDB,并从本地 conf 日志中恢复集群节点配置和 firstLogIndex 数据
-> RocksDBLogStorage#openDB,打开 RocksDB,并初始化对应的 ColumnFamily。两个ColumnFamily(名为 Configuration 的 ColumnFamily 用于存储集群节点配置相关的 LogEntry 实例;默认的 ColumnFamily 除了包含 Configuration 中的数据之外,还用于存储用户数据相关的 LogEntry 实例)
-> RocksDBLogStorage#load,从 conf 中加载集群节点配置,以及 firstLogIndex 值,并从本地剔除 firstLogIndex 之前的 conf 和 data 数据

另外,在NodeImpl#init会调用LogManagerImpl#checkConsistency方法对日志数据进行一致性校验。

LogManagerImpl#checkConsistency
-> 校验的逻辑主要是确保快照数据与当前数据的连续性,不允许存在数据断层。

2)元数据存储

元数据包括 currentTerm 值和当前节点的 votedId 信息。LocalRaftMetaStorage 实现类,基于本地文件系统采用 protobuf 协议对元数据执行序列化之后进行存储。LocalRaftMetaStorage 在初始化时(即执行 LocalRaftMetaStorage#init 方法期间)会从本地文件系统加载并反序列化元数据,以初始化 currentTerm 和 votedFor 属性值。运行期间对于这两个属性值的更改全部记录在内存中,并在关闭时(即执行 LocalRaftMetaStorage#shutdown 方法期间)将内存中的数据序列化后落盘。

NodeImpl#initMetaStorage
-> 实例化元数据存储服务,基于 LocalRaftMetaStorage 实现类
-> this.metaStorage.init()
-> 基于本地元数据恢复 currentTerm 和 votedId 属性值

3)快照数据存储

与日志数据存储模块的设计相类似,JRaft 针对快照数据存储模块同样采用了操作与存储相分离的策略,其中 SnapshotExecutor 主要负责生成和安装快照,而 SnapshotStorage 则主要负责针对快照文件的读写,以及从远端 Leader 节点拷贝快照数据。

NodeImpl#initSnapshotStorage
-> 创建和初始化SnapshotExecutorImpl
-> SnapshotExecutorImpl#init

SnapshotExecutor 的初始化过程主要是从本地加载最新的快照文件数据。

SnapshotExecutorImpl#init
-> 创建快照存储服务,基于 LocalSnapshotStorage 实现类
-> 初始化快照存储服务,主要工作是从本地删除除最后一次快照所生成的快照文件之外的其它快照数据文件
-> 打开快照文件读取器,加载快照元数据信息,加载最近一次的快照数据:FSMCaller#onSnapshotLoad 方法的实现

3.1.3 StateMachine和FSMCaller

JRaft 通过StateMachine接口抽象描述了 Raft 算法中引入的状态机。这也是 JRaft 向业务透传自己运行状态的核心接口,业务可以通过该接口捕获 JRaft 的运行事件。除了最核心的应用 LogEntry 中的指令外,还包括当前节点作为 LEADER 或 FOLLOWER 角色的启停事件、集群节点配置变更、快照加载与存储,以及集群运行错误与停机等。

JRaft 是如何将这些事件通知到业务的呢?具体点来说,通知到业务实现的状态机的呢?这就是状态机调度器 FSMCaller 所做的工作。

JRaft 通过调用 FSMCaller 中声明的方法实现将内部运行状态透传给业务,而 FSMCaller 在本地则基于 Disruptor 消息队列以事件的形式缓存这些内部状态,并通过异步的方式回调 StateMachine 接口声明的相应方法,这就是 FSMCaller 整体的运行逻辑。

NodeImpl#initFSMCaller
-> this.closureQueue = new ClosureQueueImpl(this.groupId),创建封装 Closure 的队列,基于 LinkedList 实现
-> FSMCallerImpl#init,创建和启动了一个 Disruptor 队列,用于异步处理各种状态机事件

FSMCallerImpl#enqueueTask 用于往该 Disruptor 队列写入具体的状态机事件,而 FSMCallerImpl 实现的FSMCaller 接口中声明的方法,基本上都是简单的调用了该方法。

ApplyTaskHandler处理消息。ApplyTaskHandler 通过调用 FSMCallerImpl#runApplyTask 方法对 Disruptor 消息队列中缓存的状态机事件进行处理。该方法本质上是一个事件分发器,基于具体的状态机事件类型调用对应的 do* 方法实现对事件的处理操作。

3.1.4 选票箱BallotBox

投票机制是 Raft 算法运行的基础,JRaft 在实现上为每个节点都设置了一个选票箱 BallotBox 实例,用于对 LogEntry 是否提交进行仲裁。

BallotBox 中持有的 ClosureQueue 实例是在前面介绍的 NodeImpl#initFSMCaller 中创建的,所以 FSMCaller 和 BallotBox 对象持有的 ClosureQueue 实例是同一个。BallotBox 负责往 ClosureQueue 中写数据,而 FSMCaller 则负责从 ClosureQueue 中读数据。

参考

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,125评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,293评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,054评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,077评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,096评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,062评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,988评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,817评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,266评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,486评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,646评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,375评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,974评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,621评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,642评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,538评论 2 352

推荐阅读更多精彩内容