本篇主要分三部分讨论Redis主从复制的实现原理:主从复制过程、状态机、源码解析。Redis从节点使用了状态机机制,来实现从节点不同状态的切换,所以在解析源码之前,会先讨论下状态机的基本原理。
1. 主从复制过程
Redis 的 RDB 和 AOF 机制保证了服务的可靠性,而为了让服务实现高可用,Redis 使用了主从复制,而主从复制也是MySQL等数据库或其他存储系统实现高可用的方法。
为了保证数据副本的一致性,主从库之间采用的是读写分离的方式:
- 读操作:主库、从库都可以接收;
- 写操作:首先到主库执行,然后,主库将写操作同步给从库。
1.1. 主从数据同步
当我们启动多个 Redis 实例的时候,它们相互之间就可以通过 replicaof(Redis 5.0 之前使用 slaveof)命令形成主库和从库的关系。
例如,现在有实例 1(ip:172.16.19.3)和实例 2(ip:172.16.19.5),我们在实例 2 上执行以下这个命令后,实例 2 就变成了实例 1 的从库,并从实例 1 上复制数据:
replicaof 172.16.19.3 6379
主从实例会按照三个阶段完成数据同步:建立连接、全量复制、增量复制。下面对这三个阶段进行介绍:
-
建立连接:第一阶段是主从库间建立连接、协商同步的过程,主要是为全量复制做准备。在这一步,从库和主库建立起连接,并告诉主库即将进行同步,主库确认回复后,主从库间就可以开始同步了。这一步主要包含从库给主库发送 psync 命令,以及主库响应 FULLRESYNC 命令。
从库发送 psync 命令:携带主库runID 和 复制进度 offset 两个参数。因为从库第一次连接主库,并不知道主库的 runID,因此这时候 runID = ?;而 offset 值为 -1,表示这是第一次复制。
主库响应 FULLRESYNC 命令:主库 runID 和 目前的复制进度 offset。FULLRESYNC 响应表示接下来将会进行全量复制。
-
全量复制:全量复制也会进行两个操作,主实例把所有的数据传输给从库,从库接收主库的所有数据,并在本地完成数据加载。具体来说就是:
主库执行 bgsave 命令,生成 RDB 文件,接着将文件发给从库。主实例在执行 bgsave 命令时,会 fork 一个子进程来生成 RDB 文件,这块内容后面再单独讨论吧。
从库接收到 RDB 文件后,会先清空当前数据库,然后加载 RDB 文件。这是因为从库在通过 replicaof 命令开始和主库同步前,可能保存了其他数据。为了避免之前数据的影响,从库需要先把当前数据库清空。
增量复制:第二阶段全量复制会是一个比较耗时的操作,而在进行全量复制时,主实例仍然在接收新的写命令,而这些命令是不会被写到 RDB 文件中的,具体为什么不会被写到 RDB 文件中,可以参考https://www.jianshu.com/p/f700dbd572a5 里面的 fork 和写时复制相关技术。因此就需要 replication buffer 这样一块缓冲区,来保存第二阶段执行期间,主实例接收的写操作。并在第二阶段执行结束之后,把 replication buffer 缓冲区中的数据发送给从节点。
1.2. 网络故障之后的数据同步
在 Redis 2.8 之前,如果主从库在命令传播时出现了网络闪断,那么,从库就会和主库重新进行一次全量复制,开销非常大。
从 Redis 2.8 开始,网络断了之后,主从库会采用增量复制的方式继续同步。听名字大概就可以猜到它和全量复制的不同:全量复制是同步所有数据,而增量复制只会把主从库网络断连期间主库收到的命令,同步给从库。
具体过程如下图所示:
- 当主从库断连后,主库会把断连期间收到的写操作命令,写入 replication buffer,同时也会把这些操作命令也写入 repl_backlog_buffer 这个缓冲区。repl_backlog_buffer 是一个环形缓冲区,主库会记录自己写到的位置master_repl_offset,从库则会记录自己已经读到的位置 slave_repl_offset 。
- 主从库的连接恢复之后,从库首先会给主库发送 psync 命令,并把自己当前的 slave_repl_offset 发给主库,主库只用把 master_repl_offset 和 slave_repl_offset 之间的命令操作同步给从库就行。
因为 repl_backlog_buffer 是一个环形缓冲区,所以在缓冲区写满后,主库会继续写入,此时,就会覆盖掉之前写入的操作。如果从库的读取速度比较慢,就有可能导致从库还未读取的操作被主库新写的操作覆盖了,这会导致主从库间的数据不一致。
2. 状态机
在实际的软件开发中,状态模式并不是很常用,但是在能够用到的场景里,它可以发挥很大的作用。状态模式一般用来实现状态机,而状态机常用在游戏、工作流引擎等系统开发中。
有限状态机,英文翻译是 Finite State Machine,缩写为 FSM,简称为状态机。状态机有 3 个组成部分:状态(State)、事件(Event)、动作(Action)。其中,事件也称为转移条件(Transition Condition)。事件触发状态的转移及动作的执行。不过,动作不是必须的,也可能只转移状态,不执行任何动作。
拿《超级玛丽》举个例子哈,玛丽可以有多种状态,比如小玛丽,吃了蘑菇之后,就变成了大玛丽并且增加100积分;而如果碰到了野怪,小玛丽就直接over了,大玛丽就会变成小玛丽并且减少100积分。这个例子里面呢,小玛丽或者大玛丽都是状态机的状态,加减积分就是动作,吃蘑菇或者撞野怪就是事件。
简化后的部分状态和事件如下图所示:
2.1. 分支实现
如果将上面描述的简易版超级玛丽用代码实现,简单直接的实现方式是,参照状态转移图,将每一个状态转移,原模原样地直译成代码。这样编写的代码会包含大量的 if-else 或 switch-case 分支判断逻辑,甚至是嵌套的分支判断逻辑。
public enum State {
SMALL(0),
SUPER(1),
OVER(2);
private int value;
private State(int value) {
this.value = value;
}
public int getValue() {
return this.value;
}
}
public class MarioStateMachine {
private int score;
private State currentState;
public MarioStateMachine() {
this.score = 0;
this.currentState = State.SMALL;
}
public void obtainMushRoom() {
if (currentState.equals(State.SMALL)) {
this.currentState = State.SUPER;
this.score += 100;
} else {
this.score += 100;
}
}
public void meetMonster() {
if (currentState.equals(State.SUPER)) {
this.currentState = State.SMALL;
this.score -= 100;
} else {
this.currentState = State.OVEW;
this.score = 0;
}
}
public int getScore() {
return this.score;
}
public State getCurrentState() {
return this.currentState;
}
}
public class ApplicationDemo {
public static void main(String[] args) {
MarioStateMachine mario = new MarioStateMachine();
mario.obtainMushRoom();
int score = mario.getScore();
State state = mario.getCurrentState();
System.out.println("mario score: " + score + "; state: " + state);
}
}
对于简单的状态机来说,分支逻辑这种实现方式是可以接受的。但是,对于复杂的状态机来说,这种实现方式极易漏写或者错写某个状态转移。除此之外,代码中充斥着大量的 if-else 或者 switch-case 分支判断逻辑,可读性和可维护性都很差。如果哪天修改了状态机中的某个状态转移,我们要在冗长的分支逻辑中找到对应的代码进行修改,很容易改错,引入 bug。
2.2. 状态模式
状态模式通过将事件触发的状态转移和动作执行,拆分到不同的状态类中,来避免分支判断逻辑。我们还是结合代码来理解这句话。
其中,IMario 是状态的接口,定义了所有的事件。SmallMario、SuperMario是 IMario 接口的实现类,分别对应状态机中的不同的状态。原来在状态机MarioStateMachine中定义的事件处理逻辑,现在分散到了 IMario 的实现类里面。
public interface IMario {
State getName();
void obtainMushRoom(MarioStateMachine stateMachine);
void meetMonster(MarioStateMachine stateMachine);
}
public class SmallMario implements IMario {
private static final SmallMario instance = new SmallMario();
private SmallMario() {}
public static SmallMario getInstance() {
return instance;
}
@Override
public State getName() {
return State.SMALL;
}
@Override
public void obtainMushRoom(MarioStateMachine stateMachine) {
stateMachine.setCurrentState(SuperMario.getInstance());
stateMachine.setScore(stateMachine.getScore() + 100);
}
@Override
public void meetMonster(MarioStateMachine stateMachine) {
stateMachine.setCurrentState(OverMario.getInstance());
}
}
// 省略SuperMario类、OverMario类...
public class MarioStateMachine {
private int score;
private IMario currentState;
public MarioStateMachine() {
this.score = 0;
this.currentState = SmallMario.getInstance();
}
public void obtainMushRoom() {
this.currentState.obtainMushRoom(this);
}
public void meetMonster() {
this.currentState.meetMonster(this);
}
public int getScore() {
return this.score;
}
public State getCurrentState() {
return this.currentState.getName();
}
public void setScore(int score) {
this.score = score;
}
public void setCurrentState(IMario currentState) {
this.currentState = currentState;
}
}
其实状态机还有一种实现方式为查表法,但是个人感觉这种查表法的应用场景非常有限,这里就不详细介绍了,有兴趣可以看下极客时间里面王争老师的专栏《设计模式之美》。
3. 源码解析
Redis 5.0 源码地址:https://github.com/redis/redis/tree/5.0
Redis 主从复制过程中,从节点会处于初始化、建立连接、握手验证、增量复制、全量复制等多个不同的状态。Redis 就是使用了基于状态机的设计思想,来清晰的实现不同状态间的跳转。因为主从复制过程中的状态比较多,很难把每一个状态都说清楚,这里只讨论下关键的几个状态及状态间的跳转。
3.1. 数据结构及初始化
每一个 Redis 实例在代码中都对应一个 redisServer 结构体,这个结构体包含了和 Redis 实例相关的各种配置,比如实例的 RDB、AOF 配置、主从复制配置、切片集群配置等。然后,与主从复制状态机相关的变量是 repl_state,Redis 在进行主从复制时,从库就是根据这个变量值的变化,来实现不同阶段的执行和跳转。
struct redisServer {
...
/* 复制相关(slave) */
char *masterauth; /* 用于和主库进行验证的密码*/
char *masterhost; /* 主库主机名 */
int masterport; /* 主库端口号r */
…
client *master; /* 从库上用来和主库连接的客户端 */
client *cached_master; /* 从库上缓存的主库信息 */
int repl_state; /* 从库的复制状态机 */
...
}
个人理解这里的 repl_state 就相当于 2.1 中的 State 枚举类,定义了从库的不同状态。这里有一点需要说明哈,就是主从复制的状态机都是在从节点上才有,主节点是没有状态机的,到后面会讨论主节点为什么没有状态机这个问题。
接下来说下初始化,首先,当一个实例启动后,就会调用 server.c 中的 initServerConfig 函数,初始化 redisServer 结构体。此时,实例会把状态机的初始状态设置为 REPL_STATE_NONE,如下所示:
void initServerConfig(void) {
…
server.repl_state = REPL_STATE_NONE;
…
}
然后,一旦实例执行了 replicaof 172.16.19.3 6379 命令,就会调用 replication.c 中的 replicaofCommand 函数进行处理。replicaofCommand 函数会调用 replicationSetMaster 函数设置主库的信息。这部分的代码逻辑如下所示:
void replicaofCommand(client *c) {
/* The special host/port combination "NO" "ONE" turns the instance
* into a master. Otherwise the new master address is set. */
if (!strcasecmp(c->argv[1]->ptr,"no") &&
!strcasecmp(c->argv[2]->ptr,"one")) {
......
} else {
/* 如果没有记录主库的IP和端口号,设置主库的信息 */
replicationSetMaster(c->argv[1]->ptr, port);
......
}
addReply(c,shared.ok);
}
/* Set replication to the specified master address and port. */
void replicationSetMaster(char *ip, int port) {
......
server.masterhost = sdsnew(ip);
server.masterport = port;
......
server.repl_state = REPL_STATE_CONNECT;
}
这里就是设置刚才数据结构里面提到的 host 和 port 两个变量,并把状态机的状态设置为 REPL_STATE_CONNECT。
3.2. 状态的跳转
Redis 的周期性任务,就是指 Redis 实例在运行时,按照一定时间周期重复执行的任务。Redis 的周期性任务很多,其中之一就是 replicationCron() 任务。这个任务的执行频率是每 1000ms 执行一次,如下面的代码所示:
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
…
run_with_period(1000) replicationCron();
…
}
这个 serverCron 函数在 server.c 函数中,他会在 Redis 实例启动的 main 函数执行时候,注册一个时间事件,该时间事件会立即被触发,触发后的回调函数就是这个 serverCron 函数。这一块的详细内容,后面疫情过去到公司了,再把事件驱动框架的分析贴出来,这里只需要知道这个函数会在 Redis 启动之后执行,并按照一定周期来执行相应的任务就行。
接下来再来看下 replicationCron 函数,他是在 replication.c 文件中,在这个函数里面,判断从节点状态机的状态为 REPL_STATE_CONNECT 时,会和主节点建立连接,如下所示:
/* Replication cron function, called 1 time per second. */
void replicationCron(void) {
......
/* Check if we should connect to a MASTER */
/* 如果从库实例的状态是REPL_STATE_CONNECT,那么从库通过connectWithMaster和主库建立连接 */
//3.1 小节中有分析过,执行了replicaof 之后,会把从库的状态机设置为 REPL_STATE_CONNECT,因此就会首先执行这个分支
if (server.repl_state == REPL_STATE_CONNECT) {
serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
server.masterhost, server.masterport);
if (connectWithMaster() == C_OK) {
serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
}
}
......
}
connectWithMaster 函数中,首先和主节点建立连接,返回一个文件描述符 fd,当连接 fd 上有事件发生时,会触发 syncWithMaster 回调函数,方法返回前,会给状态机的状态设置为 REPL_STATE_CONNECTING。
int connectWithMaster(void) {
int fd;
//从库和主库建立连接
fd = anetTcpNonBlockBestEffortBindConnect(NULL,
server.masterhost,server.masterport,NET_FIRST_BIND_ADDR);
if (fd == -1) {
serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
strerror(errno));
return C_ERR;
}
//在建立的连接上注册读写事件,对应的回调函数是syncWithMaster
if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
AE_ERR)
{
close(fd);
serverLog(LL_WARNING,"Can't create readable event for SYNC");
return C_ERR;
}
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_s = fd;
//完成连接后,将状态机设置为REPL_STATE_CONNECTING
server.repl_state = REPL_STATE_CONNECTING;
return C_OK;
}
syncWithMaster 函数前面会经过一系列的握手操作,然后会调用 slaveTryPartialResynchronization 函数发送 1.1 小节中提到的 psync 命令,并根据 slaveTryPartialResynchronization 函数的返回值,来执行全量复制,或者让 slaveTryPartialResynchronization 函数执行增量复制
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
//前面有一系列握手操作,这里就不详细介绍了
......
if (server.repl_state == REPL_STATE_SEND_PSYNC) {
//向主库发送PSYNC命令,进行数据同步
if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) {
err = sdsnew("Write error sending the PSYNC command.");
goto write_error;
}
server.repl_state = REPL_STATE_RECEIVE_PSYNC;
return;
}
//读取PSYNC命令的返回结果
psync_result = slaveTryPartialResynchronization(fd,1);
//PSYNC结果还没有返回,先从syncWithMaster函数返回处理其他操作
if (psync_result == PSYNC_WAIT_REPLY) return;
//如果PSYNC结果是PSYNC_CONTINUE,从syncWithMaster函数返回
if (psync_result == PSYNC_CONTINUE) {
…
return;
}
//如果执行全量复制的话,针对连接上的读事件,创建readSyncBulkPayload回调函数
if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
== AE_ERR)
{
…
}
//将从库状态机置为REPL_STATE_TRANSFER
server.repl_state = REPL_STATE_TRANSFER;
}
下面简单介绍下 slaveTryPartialResynchronization 函数,如果从库是第一次和主库连接,则发送 psync 命令,然后读取主库的响应,并根据主库的响应结果,来执行增量复制:
int slaveTryPartialResynchronization(int fd, int read_reply) {
......
/* Writing half */
//发送PSYNC命令,
if (!read_reply) {
......
//从库第一次和主库同步时,设置offset为-1
server.master_initial_offset = -1;
......
/* Issue the PSYNC command */
//调用sendSynchronousCommand发送PSYNC命令
reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_replid,psync_offset,NULL);
......
return PSYNC_WAIT_REPLY;
}
/* Reading half */
//读取主库响应
reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
if (sdslen(reply) == 0) {
/* The master may send empty newlines after it receives PSYNC
* and before to reply, just to keep the connection alive. */
sdsfree(reply);
return PSYNC_WAIT_REPLY;
}
aeDeleteFileEvent(server.el,fd,AE_READABLE);
//主库返回FULLRESYNC
if (!strncmp(reply,"+FULLRESYNC",11)) {
......
return PSYNC_FULLRESYNC;
}
//主库返回CONTINUE,执行增量复制
if (!strncmp(reply,"+CONTINUE",9)) {
......
return PSYNC_CONTINUE;
}
......
return PSYNC_NOT_SUPPORTED;
}
3.3. 主库的操作
在 Redis 实现主从复制时,从库涉及到的状态变迁有很多,包括了发起连接、主从握手、复制类型判断、请求数据等。因此,使用状态机开发从库的复制流程,可以很好地帮助我们实现状态流转。
主从复制的发起方是从库,而对于主库来说,它只是被动式地响应从库的各种请求,并根据从库的请求执行相应的操作,比如生成 RDB 文件或是传输数据等。
而且,从另外一个角度来说,主库可能和多个从库进行主从复制,而不同从库的复制进度和状态很可能并不一样,如果主库要维护状态机的话,那么,它还需要为每个从库维护一个状态机,这个既会增加开发复杂度,也会增加运行时的开销。正是因为这些原因,所以主库并不需要使用状态机进行状态流转。
主库本身是可能发生故障,并要进行故障切换的。如果主库在执行主从复制时,也维护状态机,那么一旦主库发生了故障,也还需要考虑状态机的冗余备份和故障切换,这会给故障切换的开发和执行带来复杂度和开销。而从库维护状态机本身就已经能完成主从复制,所以没有必要让主库再维护状态机了。
参考资料:
- 极客时间专栏《Redis源码剖析与实战》.蒋德钧.2021
- 极客时间专栏《Redis核心技术与实战》.蒋德钧.2020
- 极客时间专栏《设计模式之美》.王争.2020
- Redis 5.0.14 源码:https://github.com/redis/redis/tree/5.0
`