概述
Redis 主从模式下,当主节点发生故障时,需要人工干预进行主节点切换,这样会降低系统的可用性。Redis 通过哨兵(Sentinel)机制来自动进行故障切换。通常,为了保障哨兵本身的高可用性,哨兵也是以集群方式来提供服务。哨兵集群监控主从模式下各个Redis 实例的状态,当主节点发生故障时,哨兵会将其下线并从从节点中选举出新的主节点来代替其进行请求处理。那么哨兵是如何监控Redis 实例是否发生故障,在发生故障时哨兵又是如何进行故障切换的呢,本篇文章将带你了解哨兵机制的实现原理。
哨兵模式
哨兵集群会监控Redis 主从实例状态,当发现主节点发生故障下线后,哨兵集群会发起投票,从从节点中挑选出一个新的主节点用来代替故障的主节点,同时哨兵集群内部会进行Leader 选举,选出一个Leader 来进行故障切换。此外,哨兵还会继续监控已经下线的主节点,当它重新上线时,哨兵会将其设置为新的主节点的从节点。
哨兵实例启动流程
哨兵实例启动配置
哨兵实例启动命令:
$ redis-sentinel ../sentinel.conf
# 或者
$ redis-server ../sentinel.conf --sentinel
哨兵实例启动时的核心配置sentinel.conf:
# 监控的主节点信息,master1:节点别名,127.0.0.1:节点IP,6379:节点端口,1:quorum
# quorum:有多少哨兵实例判断主节点主观下线,就认为主节点客观下线了
sentinel monitor master1 127.0.0.1 6379 1
# 超过30000毫秒断了连接就认为这个实例挂了
sentinel down-after-milliseconds master1 30000
# 新的主节点切换后,可以同时重连主节点的从节点个数
sentinel parallel-syncs master1 1
# 主节点故障时,执行主从节点切换的超时时间,超过这个时间则表示切换失败,会由其他哨兵进行切换
sentinel failover-timeout master1 180000
# 配置当某一个事件发生时需要执行的脚本
sentinel notification-script master1 ../notify.sh
# 配置当主节点发生变化时,通知相关客户端
sentinel client-reconfig-script master1 ../reconfig.sh
从配置中可以看到,哨兵实例启动过程中只配置了监控主节点,并没有配置监控从节点或者其他哨兵实例,那么哨兵实例是如何感知从节点的存在的呢?哨兵实例间又是如何建立连接的呢?接下来就从哨兵实例的启动流程分析具体做了哪些事情。
初始化哨兵实例
哨兵是一个运行在特殊模式下的Redis server,和Redis server一样,它的启动入口在sever.c中的main函数中。不同的是,main函数在执行过程中,会通过checkForSentinelMode函数来判断是否是哨兵实例,针对哨兵和普通Redis server会做一些特殊处理。
#server.c
// 检查是否是哨兵模式启动
server.sentinel_mode = checkForSentinelMode(argc,argv);
int checkForSentinelMode(int argc, char **argv) {
int j;
if (strstr(argv[0],"redis-sentinel") != NULL) return 1;
for (j = 1; j < argc; j++)
if (!strcmp(argv[j],"--sentinel")) return 1;
return 0;
}
checkForSentinelMode函数会根据两种情况判断是否是哨兵实例启动:启动命令是否为redis-sentinel、启动参数中是否包含--sentinel。
针对哨兵模式启动的实例,会执行initSentinelConfig与initSentinel函数进行初始化:
#server.c
if (server.sentinel_mode) {
// 初始化哨兵配置
initSentinelConfig();
// 初始化哨兵实例
initSentinel();
}
initSentinelConfig函数会将哨兵实例的启动端口号设置为REDIS_SENTINEL_PORT,宏定义REDIS_SENTINEL_PORT在sentinel.c中,值为26379。同时还会将protected_mode属性的值设置为0,允许外部网络访问哨兵实例。
#sentinel.c
void initSentinelConfig(void) {
server.port = REDIS_SENTINEL_PORT;
server.protected_mode = 0; /* Sentinel must be exposed. */
}
initSentinel函数会将普通Redis server命令表清空,替换为哨兵模式专有的命令表:
#sentinel.c
void initSentinel(void) {
unsigned int j;
// 清空Redis server命令表
dictEmpty(server.commands,NULL);
dictEmpty(server.orig_commands,NULL);
ACLClearCommandID();
for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
int retval;
struct redisCommand *cmd = sentinelcmds+j;
cmd->id = ACLGetCommandID(cmd->name); /* Assign the ID used for ACL. */
// 添加哨兵专有的命令表
retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
serverAssert(retval == DICT_OK);
retval = dictAdd(server.orig_commands, sdsnew(cmd->name), cmd);
serverAssert(retval == DICT_OK);
}
}
哨兵模式下的命令表在sentinel.c文件中的sentinelcmds数组中,可以看到哨兵的命令表中并没有SET、GET、DEL等这些命令,这也是哨兵不支持这些操作的原因:
#sentinel.c
struct redisCommand sentinelcmds[] = {
{"ping",pingCommand,1,"fast @connection",0,NULL,0,0,0,0,0},
{"sentinel",sentinelCommand,-2,"admin",0,NULL,0,0,0,0,0},
{"subscribe",subscribeCommand,-2,"pub-sub",0,NULL,0,0,0,0,0},
{"unsubscribe",unsubscribeCommand,-1,"pub-sub",0,NULL,0,0,0,0,0},
{"psubscribe",psubscribeCommand,-2,"pub-sub",0,NULL,0,0,0,0,0},
{"punsubscribe",punsubscribeCommand,-1,"pub-sub",0,NULL,0,0,0,0,0},
{"publish",sentinelPublishCommand,3,"pub-sub fast",0,NULL,0,0,0,0,0},
{"info",sentinelInfoCommand,-1,"random @dangerous",0,NULL,0,0,0,0,0},
{"role",sentinelRoleCommand,1,"fast read-only @dangerous",0,NULL,0,0,0,0,0},
{"client",clientCommand,-2,"admin random @connection",0,NULL,0,0,0,0,0},
{"shutdown",shutdownCommand,-1,"admin",0,NULL,0,0,0,0,0},
{"auth",authCommand,-2,"no-auth fast @connection",0,NULL,0,0,0,0,0},
{"hello",helloCommand,-1,"no-auth fast @connection",0,NULL,0,0,0,0,0},
{"acl",aclCommand,-2,"admin",0,NULL,0,0,0,0,0,0},
{"command",commandCommand,-1, "random @connection", 0,NULL,0,0,0,0,0,0}
};
initSentinel函数除了会初始化哨兵支持的命令表外,还会初始化sentinelState(哨兵状态),哨兵状态里面保存了哨兵运行过程中的状态信息:
#sentinel.c
struct sentinelState {
char myid[CONFIG_RUN_ID_SIZE+1]; /* 哨兵运行ID. */
uint64_t current_epoch; /* 当前纪元. */
dict *masters; /* 保存了哨兵监控的主节点的哈希表.
键是主节点的名字,值是指向sentinelRedisInstance结构体的指针. */
int tilt; /* 是否进入了TILT模式 */
int running_scripts; /* 正在运行的脚本数量. */
mstime_t tilt_start_time; /* TILT模式启动时间. */
mstime_t previous_time; /* 上一次执行时间处理器的时间. */
list *scripts_queue; /* 需要执行的用户脚本队列. */
char *announce_ip; /* 向其他哨兵发送的IP. */
int announce_port; /* 向其他哨兵发送的端口. */
unsigned long simfailure_flags; /* 故障模拟标志. */
int deny_scripts_reconfig; /* 是否允许运行时修改脚本路径标志 */
char *sentinel_auth_pass; /* 用于对其他哨兵进行身份验证的密码 */
char *sentinel_auth_user; /* 用于对其他哨兵的ACLs身份验证用户名. */
int resolve_hostnames; /* 支持使用主机名. */
int announce_hostnames; /* 当有主机名时,使用主机名替代IP. */
} sentinel;
哨兵状态中的masters哈希表记录了哨兵监控的主节点信息,哈希表的键是主节点的名字,值是指向sentinelRedisInstance结构体的指针。sentinelRedisInstance结构体代表一个被哨兵监控的实例,这个实例可以是主节点、从节点或者是其他哨兵实例,结构体的核心属性如下所示:
#sentinel.c
typedef struct sentinelRedisInstance {
int flags; /* 实例类型与状态标识 */
char *name; /* 实例名字. */
char *runid; /* 实例运行时的唯一ID.*/
uint64_t config_epoch; /* 配置纪元. */
sentinelAddr *addr; /* 实例地址. */
instanceLink *link; /* 实例的连接,有可能是被Sentinel共享的. */
mstime_t last_pub_time; /* 最近一次通过 Pub/Sub 发送信息的时长. */
mstime_t last_hello_time; /* 最近一次接收到从Sentinel发送来hello的时长,
仅仅当有SRI_SENTINEL属性时才有效. */
mstime_t last_master_down_reply_time; /* 最近一次回复SENTINEL is-master-down命令的时长. */
mstime_t s_down_since_tie; /* 实例被判断为主观下线的时长. */
mstime_t o_down_since_time; /* 实例被判断为客观下线的时长. */
mstime_t down_after_period; /* 实例多少毫秒没有响应就会判定为主观下线. */
mstime_t info_refresh; /* 从实例获取INFO命令回复的时长. */
/* 主节点特有的一些属性 */
dict *sentinels; /* 监听同一主节点的其他哨兵实例. */
dict *slaves; /* 主节点下面的从节点. */
unsigned int quorum; /* 有多少哨兵实例判断主节点主观下线,就认为主节点客观下线了. */
int parallel_syncs; /* 可以同时对新的主节点进行同步的从服务器数量. */
/* 从节点特有的一些属性 */
mstime_t master_link_down_time; /* 从节点复制操作断开时长. */
int slave_priority; /* 按照INFO命令输出的从节点优先级. */
mstime_t slave_reconf_sent_time; /* 故障切换时,从节点发送SLAVEOF <new>命令的时长 */
struct sentinelRedisInstance *master; /* 如果当前实例是从节点, master保存该从节点连接的主节点实例. */
char *slave_master_host; /* INFO命令的回复中记录的主节点的IP */
int slave_master_port; /* INFO命令的回复中记录的主节点的port */
int slave_master_link_status; /* INFO命令的回复中记录的主从服务器连接的状态 */
unsigned long long slave_repl_offset; /* 从节点复制偏移量. */
char *leader; /* 如果这是一个主节点实例,那么leader保存的是执行故障切换的Sentinel的runid,
如果这是一个Sentinel实例,那么leader保存的是当前这个Sentinel实例选举出来的领头的runid. */
uint64_t leader_epoch; /* leader字段的纪元. */
uint64_t failover_epoch; /* 当前执行故障故障切换的纪元. */
int failover_state; /* 故障切换操作的状态 */
mstime_t failover_state_change_time; /* 故障切换操作状态改变的时长 */
mstime_t failover_start_time; /* 最近一次故障切换尝试开始的时长. */
mstime_t failover_timeout; /* 哨兵执行故障切换的超时时长. */
mstime_t failover_delay_logged; /* 记录故障切换延迟的时长 */
struct sentinelRedisInstance *promoted_slave; /* 晋升为新主节点的从节点实例. */
char *notification_script; /* 通知admin的可执行脚本的地址,如果设置为空,则没有执行的脚本 */
char *client_reconfig_script; /* 客户端重新配置的可执行脚本的地址,如果设置为空,则没有执行的脚本 */
sds info; /* 缓存INFO命令的输出 */
} sentinelRedisInstance;
其中,flags的值可以设置为SRI_MASTER、SRI_SLAVE、SRI_SENTINEL,这三个宏定义在sentinel.c中,分别表示:主节点、从节点、其他哨兵实例。addr属性是一个指向sentinelAddr结构体的指针,该结构体在sentinel.c中定义,里面保存了实例的主机名、IP和端口信息:
#sentinel.c
typedef struct sentinelAddr {
char *hostname;
char *ip;
int port;
} sentinelAddr;
在这个哨兵实例结构体中,还保存了两个指向sentinelRedisInstance结构体的哈希表属性:sentinels与slaves,这两个哈希表中分别记录了监控同一主节点的其他哨兵实例信息与主节点下面的从节点实例信息。那么这两个属性是什么时候设置的呢?接下来就进入本篇文章的重点:启动哨兵实例。
启动哨兵实例
在初始化完成哨兵实例的属性配置后,main函数会调用sentinelIsRunning函数启动哨兵实例,该函数首先会校验是否设置了哨兵运行ID,如果没有设置则随机生成一个ID。紧接着,会调用sentinelGenerateInitialMonitorEvents函数向主节点发送+monitor事件:
#sentinel.c
void sentinelIsRunning(void) {
int j;
// 检查是否设置了哨兵运行ID
for (j = 0; j < CONFIG_RUN_ID_SIZE; j++)
if (sentinel.myid[j] != 0) break;
// 随机生成哨兵运行ID,并保存到配置文件中
if (j == CONFIG_RUN_ID_SIZE) {
getRandomHexChars(sentinel.myid,CONFIG_RUN_ID_SIZE);
sentinelFlushConfig();
}
/* Log its ID to make debugging of issues simpler. */
serverLog(LL_WARNING,"Sentinel ID is %s", sentinel.myid);
//向主节点发送+monitor事件
sentinelGenerateInitialMonitorEvents();
}
// 向主节点发送+monitor事件
void sentinelGenerateInitialMonitorEvents(void) {
dictIterator *di;
dictEntry *de;
// 获取哨兵状态信息中的主节点信息
di = dictGetIterator(sentinel.masters);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
// 向主节点发送+monitor事件
sentinelEvent(LL_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum);
}
dictReleaseIterator(di);
}
sentinelGenerateInitialMonitorEvents函数中会调用sentinelEvent函数发送事件信息。sentinelEvent函数会判断发送的消息是否以%@开头,并判断要监控的实例是否是主节点,如果这两个条件都满足才会将主节点的名称、IP与端口放入到待发送消息中,然后会调用pubsubPublishMessage函数发送消息:
#sentinel.c
void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
const char *fmt, ...) {
va_list ap;
char msg[LOG_MAX_LEN];
robj *channel, *payload;
/* Handle %@ */
if (fmt[0] == '%' && fmt[1] == '@') {
sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
NULL : ri->master;
if (master) {
snprintf(msg, sizeof(msg), "%s %s %s %d @ %s %s %d",
sentinelRedisInstanceTypeStr(ri),
ri->name, announceSentinelAddr(ri->addr), ri->addr->port,
master->name, announceSentinelAddr(master->addr), master->addr->port);
}
}
/* Publish the message via Pub/Sub if it's not a debugging one. */
if (level != LL_DEBUG) {
channel = createStringObject(type,strlen(type));
payload = createStringObject(msg,strlen(msg));
// 发送消息
pubsubPublishMessage(channel,payload);
decrRefCount(channel);
decrRefCount(payload);
}
// 调用脚本将事件通知给系统管理员
if (level == LL_WARNING && ri != NULL) {
sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
ri : ri->master;
if (master && master->notification_script) {
sentinelScheduleScriptExecution(master->notification_script,
type,msg,NULL);
}
}
}
在main函数的最后,会调用aeMain函数,该函数会一直循环调用aeProcessEvents函数处理IO事件与时间事件:
#ae.c
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
本篇文章重点关注时间事件的处理,因为哨兵的核心处理逻辑就是通过事件事件触发的。在初始化哨兵实例时会创建时间事件并注册回调函数,当事件被处理时就会执行事件的回调函数:
#ae.c
static int processTimeEvents(aeEventLoop *eventLoop) {
if (flags & AE_TIME_EVENTS)
// 处理时间事件
processed += processTimeEvents(eventLoop);
return processed;
}
static int processTimeEvents(aeEventLoop *eventLoop) {
// 调用事件注册的回调函数
retval = te->timeProc(eventLoop, id, te->clientData);
if (retval != AE_NOMORE) {
te->when = now + retval * 1000;
} else {
te->id = AE_DELETED_EVENT_ID;
}
}
时间事件的创建是在initServer函数中,创建事件的时候会注册serverCron回调函数:
#server.c
void initServer(void) {
创建时间事件并注册serverCron回调函数
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
}
在serverCron函数中会调用哨兵的时间事件处理函数:sentinelTimer,在该函数中完成了哨兵对主从集群中节点的监控、哨兵选举以及故障切换等操作:
#server.c
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
// 调用哨兵时间处理函数
if (server.sentinel_mode) sentinelTimer();
}
到此,哨兵实例已经初始化完成并成功启动,接下来会详细介绍哨兵的时间事件。
哨兵时间事件
定时触发
哨兵实例启动时会调用initServer函数进行初始化,在该函数中会调用aeCreateTimeEvent函数创建时间事件,并设置该事件在1s后执行,然后会将其添加至eventLoop中的链表头节点。aeMain函数会一直循环处理到达时间的时间事件,并周期性执行其回调函数。serverCron回调函数执行频率为server.hz,该属性可以在配置文件中修改,默认值为10,即每秒执行10次。在serverCron函数执行时会调用哨兵的事件处理函数:sentinelTimer,哨兵的核心处理逻辑就是在该函数中,我们来看下sentinelTimer的具体实现:
#sentinel.c
void sentinelTimer(void) {
// 检查是否需要进入TITL模式
sentinelCheckTiltCondition();
// 对监控的节点进行监控、故障切换等处理
sentinelHandleDictOfRedisInstances(sentinel.masters);
// 运行等待中的脚本
sentinelRunPendingScripts();
// 清理已成功执行的脚本,重试执行错误的脚本
sentinelCollectTerminatedScripts();
// 杀死执行超时的脚本
sentinelKillTimedoutScripts();
// 调整时间处理函数执行频率,避免多个哨兵同时开始投票导致得不到过半票数
server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ;
}
TILT模式
sentinelTimer函数首先会调用sentinelCheckTiltCondition函数检查是否需要进入TILT模式,在该模式下哨兵只会通过命令连接与订阅连接获取其他节点信息,而不会进行主观下线、客观下线、故障切换。如果当前时间与上次执行的时间差值为负数或者大于2s,则进入TILT模式,当超过SENTINEL_TILT_PERIOD时长则会退出TILT模式。检查是否进入TILT模式的逻辑如下:
#sentinel.c
void sentinelCheckTiltCondition(void) {
mstime_t now = mstime();
// 计算与上次执行时间的差值
mstime_t delta = now - sentinel.previous_time;
// 差值为负数或者大于2s则进入TILT模式
if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
sentinel.tilt = 1;
sentinel.tilt_start_time = mstime();
// 发送TILT事件
sentinelEvent(LL_WARNING,"+tilt",NULL,"#tilt mode entered");
}
// 更新最近一次时间事件处理函数执行时间
sentinel.previous_time = mstime();
}
实例监控
检查完是否进入TILT模式后,会调用sentinelHandleDictOfRedisInstances函数,在该函数中哨兵会建立与其他实例的连接,通过发送命令、订阅频道方式获取并更新实例信息,并对实例状态进行监控:
#sentinel.c
void sentinelHandleDictOfRedisInstances(dict *instances) {
dictIterator *di;
dictEntry *de;
sentinelRedisInstance *switch_to_promoted = NULL;
// 获取监控的主节点的迭代器
di = dictGetIterator(instances);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
// 检测主节点状态
sentinelHandleRedisInstance(ri);
if (ri->flags & SRI_MASTER) {
// 检测从节点状态
sentinelHandleDictOfRedisInstances(ri->slaves);
// 检测其他哨兵实例状态
sentinelHandleDictOfRedisInstances(ri->sentinels);
// 如果已经完成故障切换
if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
// 设置主从切换标识
switch_to_promoted = ri;
}
}
}
if (switch_to_promoted)
// 将旧的主节点切换成新的主节点
sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
dictReleaseIterator(di);
}
哨兵实例会分别对监控的主从实例建立两个连接:命令连接与订阅连接,同时也会创建连向其他哨兵的命令连接。命令连接的作用是为了向建立连接的实例发送命令并接收命令回复,比如通过向其他实例发送PING命令判断其他实例是否主观下线,通过INFO命令获取主从节点信息,通过SENTINEL is-master-down-by-addr命令询问其他哨兵实例主节点状态等等。订阅连接的作用是通过该连接获取其他实例向所订阅的频道中发布的消息,比如哨兵通过订阅_sentinel_:hello
频道,感知向该频道发布消息的其他哨兵实例。具体实现在sentinelHandleRedisInstance函数中:
#sentinel.c
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
// 如果命令连接或订阅连接断开则重建连接
sentinelReconnectInstance(ri);
// 发送周期性命令
sentinelSendPeriodicCommands(ri);
// 如果进入TILT模式超过SENTINEL_TILT_PERIOD(默认30s)则退出哨兵模式
if (sentinel.tilt) {
if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
sentinel.tilt = 0;
sentinelEvent(LL_WARNING,"-tilt",NULL,"#tilt mode exited");
}
// 检查客观下线
sentinelCheckSubjectivelyDown(ri);
if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
/* Nothing so far. */
}
// 检查主观下线并判断是否进行故障切换
if (ri->flags & SRI_MASTER) {
// 检查主观下线
sentinelCheckObjectivelyDown(ri);
// 检查是否需要故障切换
if (sentinelStartFailoverIfNeeded(ri))
// 询问其他哨兵节点对主节点的状态判断并投票选举领头哨兵
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
// 执行故障切换
sentinelFailoverStateMachine(ri);
// 询问其他哨兵节点对主节点的状态判断
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
}
}
重建连接
sentinelHandleRedisInstance函数满足三个条件才会重建连接:实例的连接没有中断、有效的连接地址以及距离上次重建连接时间大于SENTINEL_PING_PERIOD。
#sentinel.c
void sentinelReconnectInstance(sentinelRedisInstance *ri) {
// 判断实例连接是否中断
if (ri->link->disconnected == 0) return;
// 判断地址是否有效
if (ri->addr->port == 0) return; /* port == 0 means invalid address. */
instanceLink *link = ri->link;
mstime_t now = mstime();
// 判断距离上次重连时间是否小于1s
if (now - ri->link->last_reconn_time < SENTINEL_PING_PERIOD) return;
ri->link->last_reconn_time = now;
// 建立命令连接
if (link->cc == NULL) {
// 如果开启主机名解析,则进行主机名解析,该配置默认关闭
if (sentinel.resolve_hostnames) {
sentinelAddr *tryResolveAddr = createSentinelAddr(ri->addr->hostname, ri->addr->port, 0);
if (tryResolveAddr != NULL) {
releaseSentinelAddr(ri->addr);
ri->addr = tryResolveAddr;
}
}
// 建立异步连接
link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
// 开启cloexec选项
if (link->cc && !link->cc->err) anetCloexec(link->cc->c.fd);
// 如果无法建立连接,则发送-cmd-link-reconnection事件
if (!link->cc) {
sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #Failed to establish connection");
} else if (!link->cc->err && server.tls_replication &&
(instanceLinkNegotiateTLS(link->cc) == C_ERR)) {
// 如果启用TLS复制并且TLS初始化失败则发送-cmd-link-reconnection事件
sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #Failed to initialize TLS");
instanceLinkCloseConnection(link,link->cc);
} else if (link->cc->err) {
// 如果连接创建失败则发送-cmd-link-reconnection事件
sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
link->cc->errstr);
instanceLinkCloseConnection(link,link->cc);
} else {
link->pending_commands = 0;
link->cc_conn_time = mstime();
link->cc->data = link;
// 将命令连接关联到EventLoop上
redisAeAttach(server.el,link->cc);
// 设置建立连接的回调函数
redisAsyncSetConnectCallback(link->cc,
sentinelLinkEstablishedCallback);
// 设置断开连接的回调函数
redisAsyncSetDisconnectCallback(link->cc,
sentinelDisconnectCallback);
// 如果需要,哨兵向主节点发送Auth命令
sentinelSendAuthIfNeeded(ri,link->cc);
// 设置连接的名字
sentinelSetClientName(ri,link->cc,"cmd");
// 当重连时尽快发送PING命令
sentinelSendPing(ri);
}
}
// 建立订阅连接
if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) {
// 建立异步连接
link->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
// 开启cloexec选项
if (link->pc && !link->pc->err) anetCloexec(link->pc->c.fd);
if (!link->pc) {
sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #Failed to establish connection");
} else if (!link->pc->err && server.tls_replication &&
(instanceLinkNegotiateTLS(link->pc) == C_ERR)) {
sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #Failed to initialize TLS");
} else if (link->pc->err) {
sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
link->pc->errstr);
instanceLinkCloseConnection(link,link->pc);
} else {
int retval;
link->pc_conn_time = mstime();
link->pc->data = link;
redisAeAttach(server.el,link->pc);
redisAsyncSetConnectCallback(link->pc,
sentinelLinkEstablishedCallback);
redisAsyncSetDisconnectCallback(link->pc,
sentinelDisconnectCallback);
sentinelSendAuthIfNeeded(ri,link->pc);
sentinelSetClientName(ri,link->pc,"pubsub");
// 订阅__sentinel__:hello频道
retval = redisAsyncCommand(link->pc,
sentinelReceiveHelloMessages, ri, "%s %s",
sentinelInstanceMapCommand(ri,"SUBSCRIBE"),
SENTINEL_HELLO_CHANNEL);
if (retval != C_OK) {
/* If we can't subscribe, the Pub/Sub connection is useless
* and we can simply disconnect it and try again. */
instanceLinkCloseConnection(link,link->pc);
return;
}
}
}
sentinelHandleRedisInstance函数会调用redisAsyncConnectBind函数建立连接,同时会把创建的连接关联到EventLoop上,因为建立的是异步连接,所以会注册回调函数对连接情况进行处理。在连接建立后,会调用sentinelSendAuthIfNeeded函数发送AUTH命令进行认证、调用sentinelSetClientName函数发送CLIENT SETNAME命令设置连接的名字。对于命令连接,建立连接后会调用sentinelSendPing函数发送PING命令测试连接状态,并注册sentinelPingReplyCallback回调函数处理PING命令返回的回复来更新实例状态等信息。对于订阅连接,建立连接后,会发送SUBSCRIBE命令订阅__sentinel__:hello
频道,并注册sentinelReceiveHelloMessages回调函数,处理从频道接收到的消息,以此来获取其他哨兵实例信息或主节点的从节点信息。
发送命令
在命令连接创建之后,哨兵默认每隔十秒向主从节点发送INFO命令,并注册回调函数sentinelInfoReplyCallback来处理INFO命令回复信息,对主从节点结构信息进行更新。每隔一秒向主从节点和其他哨兵实例发送PING命令,并注册回调函数sentinelPingReplyCallback来处理PING命令回复,来判断实例是否在线。每隔两秒向主从节点发送PUBLISH命令,改命令会向主从服务器的__sentinel__:hello
频道发送哨兵自身信息以及哨兵监视的主节点信息,而其他订阅该频道的哨兵也会接收到该消息,从而获取到该哨兵信息。
#sentinel.c
void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
// 发送INFO命令
if ((ri->flags & SRI_SENTINEL) == 0 &&
(ri->info_refresh == 0 ||
(now - ri->info_refresh) > info_period))
{
retval = redisAsyncCommand(ri->link->cc,
sentinelInfoReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"INFO"));
if (retval == C_OK) ri->link->pending_commands++;
}
// 发送PING命令
if ((now - ri->link->last_pong_time) > ping_period &&
(now - ri->link->last_ping_time) > ping_period/2) {
sentinelSendPing(ri);
}
// 发送PUBLISH命令
if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
sentinelSendHello(ri);
}
}
主观下线
哨兵会对监控的节点进行检查,判断节点是否已经主观下线。当发送PING命令的时长超过down_after_period阈值还没有收到回复,或者哨兵认为当前节点为主节点,但是当前节点向哨兵报告它将成为从节点,当当前时间减去报告时间超过down_after_period加上SENTINEL_INFO_PERIOD*2后该节点还没成为从节点,则判断该节点主观下线。
#sentinel.c
void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
mstime_t elapsed = 0;
// 计算距离上次发送PING命令的时间间隔,如果已经断开连接则计算距离连接最后可用的时间间隔
if (ri->link->act_ping_time)
elapsed = mstime() - ri->link->act_ping_time;
else if (ri->link->disconnected)
elapsed = mstime() - ri->link->last_avail_time;
// 检测是否需要重建命令连接
if (ri->link->cc &&
(mstime() - ri->link->cc_conn_time) >
SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
ri->link->act_ping_time != 0 &&
(mstime() - ri->link->act_ping_time) > (ri->down_after_period/2) &&
(mstime() - ri->link->last_pong_time) > (ri->down_after_period/2))
{
// 关闭连接等待重建
instanceLinkCloseConnection(ri->link,ri->link->cc);
}
// 检测是否需要重建订阅连接
if (ri->link->pc &&
(mstime() - ri->link->pc_conn_time) >
SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
(mstime() - ri->link->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
{
// 关闭连接等待重建
instanceLinkCloseConnection(ri->link,ri->link->pc);
}
// 检测是否主观下线
if (elapsed > ri->down_after_period ||
(ri->flags & SRI_MASTER &&
ri->role_reported == SRI_SLAVE &&
mstime() - ri->role_reported_time >
(ri->down_after_period+SENTINEL_INFO_PERIOD*2)))
{
/* Is subjectively down */
// 主观下线
if ((ri->flags & SRI_S_DOWN) == 0) {
sentinelEvent(LL_WARNING,"+sdown",ri,"%@");
ri->s_down_since_time = mstime();
ri->flags |= SRI_S_DOWN;
}
} else {
/* Is subjectively up */
// 退出主观下线
if (ri->flags & SRI_S_DOWN) {
sentinelEvent(LL_WARNING,"-sdown",ri,"%@");
ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
}
}
}
客观下线
针对主节点,如果哨兵主观的认为该节点已经下线,哨兵会获取所有监控该节点的哨兵实例并统计主观下线的节点数量,如果统计的数量大于配置的quorum,则标记该主节点客观下线。
#sentinel.c
void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
dictIterator *di;
dictEntry *de;
unsigned int quorum = 0, odown = 0;
// 判断是否主观下线,如果为主观下线则统计所有哨兵中认为主观下线的数量
if (master->flags & SRI_S_DOWN) {
/* Is down for enough sentinels? */
quorum = 1; /* the current sentinel. */
/* Count all the other sentinels. */
di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
if (ri->flags & SRI_MASTER_DOWN) quorum++;
}
dictReleaseIterator(di);
// 标记客观下线
if (quorum >= master->quorum) odown = 1;
}
// 如果客观下线,则发送+odown事件
if (odown) {
if ((master->flags & SRI_O_DOWN) == 0) {
sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d",
quorum, master->quorum);
master->flags |= SRI_O_DOWN;
master->o_down_since_time = mstime();
}
} else {
if (master->flags & SRI_O_DOWN) {
sentinelEvent(LL_WARNING,"-odown",master,"%@");
master->flags &= ~SRI_O_DOWN;
}
}
}
故障切换
在sentinelHandleRedisInstance函数最后会对监控的主节点状态进行判断,如果节点已经客观下线、没有在执行故障切换、如果正在执行故障切换,故障切换时间超过了failover_timeout*2,哨兵就会执行故障切换。
int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
// 判断是否客观下线
if (!(master->flags & SRI_O_DOWN)) return 0;
// 判断是否正在执行故障切换
if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0;
// 故障切换时间超过了failover_timeout*2
if (mstime() - master->failover_start_time <
master->failover_timeout*2) return 0;
// 执行故障切换
sentinelStartFailover(master);
return 1;
}
如果满足故障切换的条件,则会调用sentinelStartFailover函数,开始故障切换。在该函数中会更新主节点的属性信息,比如:设置failover_state为SENTINEL_FAILOVER_STATE_WAIT_START(故障切换等待开始),在该状态下哨兵会计算其他节点选举领头哨兵的投票数量。同时还会设置flags、failover_epoch等属性信息:
#sentinel.c
void sentinelStartFailover(sentinelRedisInstance *master) {
serverAssert(master->flags & SRI_MASTER);
// 设置故障切换等待开始
master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
// 设置处于故障切换过程中
master->flags |= SRI_FAILOVER_IN_PROGRESS;
// 设置纪元
master->failover_epoch = ++sentinel.current_epoch;
// 发送+new-epoch事件
sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
(unsigned long long) sentinel.current_epoch);
// 发送+try-failover事件
sentinelEvent(LL_WARNING,"+try-failover",master,"%@");
// 设置故障切换开始事件
master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
// 设置故障切换状态变更事件
master->failover_state_change_time = mstime();
}
如果sentinelStartFailoverIfNeeded函数判断需要执行故障切换,则会调用sentinelAskMasterStateToOtherSentinels函数,在该函数中哨兵会向其他哨兵实例发送SENTINEL is-master-down-by-addr
命令进行领头哨兵选举。
#sentinel.c
void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
dictIterator *di;
dictEntry *de;
di = dictGetIterator(master->sentinels);
// 遍历监控主节点的所有哨兵实例
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
// 计算当前时间与最近一次回复SENTINEL master-down-by-addr 命令的时间间隔
mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
char port[32];
int retval;
// 如果时间间隔大于SENTINEL_ASK_PERIOD*5,则清除该哨兵实例监控的主节点信息
if (elapsed > SENTINEL_ASK_PERIOD*5) {
ri->flags &= ~SRI_MASTER_DOWN;
sdsfree(ri->leader);
ri->leader = NULL;
}
// 主节点客观下线
if ((master->flags & SRI_S_DOWN) == 0) continue;
// 连接中断
if (ri->link->disconnected) continue;
// flags不为SENTINEL_ASK_FORCED,或者在SENTINEL_ASK_PERIOD周期内收到目标哨兵回复
if (!(flags & SENTINEL_ASK_FORCED) &&
mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
continue;
ll2string(port,sizeof(port),master->addr->port);
// 发送SENTINEL is-master-down-by-addr命令
retval = redisAsyncCommand(ri->link->cc,
sentinelReceiveIsMasterDownReply, ri,
"%s is-master-down-by-addr %s %s %llu %s",
sentinelInstanceMapCommand(ri,"SENTINEL"),
announceSentinelAddr(master->addr), port,
sentinel.current_epoch,
(master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
sentinel.myid : "*");
if (retval == C_OK) ri->link->pending_commands++;
}
dictReleaseIterator(di);
}
哨兵在满足以下条件才会发送SENTINEL is-master-down-by-addr <ip> <port> <current-epoch> <runid>
命令进行哨兵选举:1.主节点客观下线;2.连接未中断;3.flags为SENTINEL_ASK_FORCED;4.在SENTINEL_ASK_PERIOD周期内没收到目标哨兵回复。如果主节点状态大于SENTINEL_FAILOVER_STATE_NONE,哨兵会将命令中的runid替换为自己的运行ID,这表示要求目标哨兵将其设置为领头哨兵。
目标哨兵接收到SENTINEL is-master-down-by-addr
命令后,首先会检测哨兵命令中的运行ID是不是,如果不是则会调用sentinelVoteLeader*函数来进行投票,并将投票结果返回给发送命令的哨兵,这段逻辑在哨兵的命令处理函数中:
#sentinel.c
void sentinelCommand(client *c) {
...
else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
sentinelRedisInstance *ri;
long long req_epoch;
uint64_t leader_epoch = 0;
char *leader = NULL;
long port;
int isdown = 0;
if (c->argc != 6) goto numargserr;
if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != C_OK ||
getLongLongFromObjectOrReply(c,c->argv[4],&req_epoch,NULL) != C_OK)
return;
ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
c->argv[2]->ptr,port,NULL);
// 如果哨兵不是处于TILT模式、实例不为空、实例已经被标记为主观下线、实例是主节点,则设置变量isdown为1
if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
(ri->flags & SRI_MASTER))
isdown = 1;
// 如果实例是主节点、当前命令的运行ID不为*
if (ri && ri->flags & SRI_MASTER && strcasecmp(c->argv[5]->ptr,"*")) {
// 进行投票选举
leader = sentinelVoteLeader(ri,(uint64_t)req_epoch,
c->argv[5]->ptr,
&leader_epoch);
}
// 命令回复,格式为:isdown + leader + leader_epoch
addReplyArrayLen(c,3);
addReply(c, isdown ? shared.cone : shared.czero);
addReplyBulkCString(c, leader ? leader : "*");
addReplyLongLong(c, (long long)leader_epoch);
if (leader) sdsfree(leader);
}
...
}
sentinelVoteLeader函数中会进行投票选举,哨兵设置领头哨兵的规则是最先向目标发送的哨兵将成为目标哨兵的领头,同时也要满足master纪元小于请求纪元并且哨兵当前纪元小于等于请求纪元,即如果哨兵在req_epoch纪元已经投过票或者已经为大于req_epoch纪元投过票了,则返回之前投的票,否则投当前请求的哨兵:
#sentinel.c
char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch) {
// 请求纪元大于哨兵当前纪元
if (req_epoch > sentinel.current_epoch) {
sentinel.current_epoch = req_epoch;
sentinelFlushConfig();
sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
(unsigned long long) sentinel.current_epoch);
}
// master记录的领头纪元小于请求纪元并且哨兵当前纪元小于等于请求纪元
if (master->leader_epoch < req_epoch && sentinel.current_epoch <= req_epoch)
{
sdsfree(master->leader);
master->leader = sdsnew(req_runid);
master->leader_epoch = sentinel.current_epoch;
sentinelFlushConfig();
sentinelEvent(LL_WARNING,"+vote-for-leader",master,"%s %llu",
master->leader, (unsigned long long) master->leader_epoch);
if (strcasecmp(master->leader,sentinel.myid))
master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
}
*leader_epoch = master->leader_epoch;
return master->leader ? sdsnew(master->leader) : NULL;
}
在sentinelAskMasterStateToOtherSentinels函数中发送SENTINEL is-master-down-by-addr
命令时,会注册sentinelReceiveIsMasterDownReply回调函数来处理命令回复。
#sentinel.c
void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRedisInstance *ri = privdata;
instanceLink *link = c->data;
redisReply *r;
if (!reply || !link) return;
link->pending_commands--;
r = reply;
if (r->type == REDIS_REPLY_ARRAY && r->elements == 3 &&
r->element[0]->type == REDIS_REPLY_INTEGER &&
r->element[1]->type == REDIS_REPLY_STRING &&
r->element[2]->type == REDIS_REPLY_INTEGER)
{
ri->last_master_down_reply_time = mstime();
if (r->element[0]->integer == 1) {
// 更新flags属性为主观下线
ri->flags |= SRI_MASTER_DOWN;
} else {
ri->flags &= ~SRI_MASTER_DOWN;
}
if (strcmp(r->element[1]->str,"*")) {
sdsfree(ri->leader);
if ((long long)ri->leader_epoch != r->element[2]->integer)
serverLog(LL_WARNING,
"%s voted for %s %llu", ri->name,
r->element[1]->str,
(unsigned long long) r->element[2]->integer);
// 更新实例中的leader属性
ri->leader = sdsnew(r->element[1]->str);
// 更新纪元
ri->leader_epoch = r->element[2]->integer;
}
}
}
在完成领头哨兵选举后,会由这个领头哨兵执行故障切换,故障切换的逻辑在sentinelFailoverStateMachine函数中,该函数使用状态机方式,根据故障切换执行的阶段调用不同的函数:
#sentinel.c
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
serverAssert(ri->flags & SRI_MASTER);
if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
switch(ri->failover_state) {
case SENTINEL_FAILOVER_STATE_WAIT_START:
// 计算投票数量
sentinelFailoverWaitStart(ri);
break;
case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
// 选则最佳从节点
sentinelFailoverSelectSlave(ri);
break;
case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
// 将选出的从节点晋升为主节点
sentinelFailoverSendSlaveOfNoOne(ri);
break;
case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
// 等待晋升成功,如果超时则重选主节点
sentinelFailoverWaitPromotion(ri);
break;
case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
// 修改从服务器复制目标
sentinelFailoverReconfNextSlave(ri);
break;
}
}
SENTINEL_FAILOVER_STATE_WAIT_START:
调用sentinelStartFailoverIfNeeded函数判断是否进行故障切换,当条件满足时就会调用sentinelStartFailover函数,开始故障切换,同时会把failover_state设置为SENTINEL_FAILOVER_STATE_WAIT_START,并发送SENTINEL is-master-down-by-addr
命令进行投票选举领头哨兵。当flags处于SENTINEL_FAILOVER_STATE_WAIT_START状态时,会调用sentinelFailoverWaitStart函数,该函数会统计当前纪元下所有监控主节点的哨兵实例所获得的投票数量,并根据投票数量选出赢得选举的哨兵实例。如果没有哨兵实例满足条件或者选举出的领头哨兵不是当前哨兵实例,且主节点flags不是SRI_FORCE_FAILOVER,并且选举时长没有超过min(SENTINEL_ELECTION_TIMEOUT,failover_timeout),则不做任何处理直接返回,如果超过该时长,则中止本次故障切换。如果有哨兵实例赢得选举,则由该实例进行故障切换,并将failover_state设置为SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
#sentinel.c
void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
char *leader;
int isleader;
// 获取赢得选举的哨兵实例,如果为NULL则表示没有,没有选举出领头哨兵
leader = sentinelGetLeader(ri, ri->failover_epoch);
// 判断是否当前哨兵实例被选举为领头哨兵
isleader = leader && strcasecmp(leader,sentinel.myid) == 0;
sdsfree(leader);
// 如果领头哨兵不是当前哨兵或者没有实例赢得选举并且不是强制故障转移
if (!isleader && !(ri->flags & SRI_FORCE_FAILOVER)) {
int election_timeout = SENTINEL_ELECTION_TIMEOUT;
// 判断是否超时,如果超时则中止故障切换
if (election_timeout > ri->failover_timeout)
election_timeout = ri->failover_timeout;
if (mstime() - ri->failover_start_time > election_timeout) {
sentinelEvent(LL_WARNING,"-failover-abort-not-elected",ri,"%@");
sentinelAbortFailover(ri);
}
return;
}
// 发送+elected-leader事件
sentinelEvent(LL_WARNING,"+elected-leader",ri,"%@");
if (sentinel.simfailure_flags & SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION)
sentinelSimFailureCrash();
// 更新故障切换状态为选取从节点
ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
// 更新故障切换状态变更时间
ri->failover_state_change_time = mstime();
// 发送+failover-state-select-slave事件
sentinelEvent(LL_WARNING,"+failover-state-select-slave",ri,"%@");
}
SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
当故障切换状态为SENTINEL_FAILOVER_STATE_SELECT_SLAVE时,会调用sentinelFailoverSelectSlave函数选出最佳从节点,如果没有选出从节点则中止本次故障切换,如果选出则将该从节点的flags设置为SRI_PROMOTED,并将主节点的promoted_slave设置为选出的从节点,同时将故障切换状态设置为SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE并更新故障切换状态变更时间:
#sentinel.c
void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
// 选出最佳从节点
sentinelRedisInstance *slave = sentinelSelectSlave(ri);
// 如果没有选出从节点则中止故障切换
if (slave == NULL) {
sentinelEvent(LL_WARNING,"-failover-abort-no-good-slave",ri,"%@");
sentinelAbortFailover(ri);
} else {
sentinelEvent(LL_WARNING,"+selected-slave",slave,"%@");
slave->flags |= SRI_PROMOTED;
ri->promoted_slave = slave;
// 更新状态
ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
// 更新故障切换状态变更时间
ri->failover_state_change_time = mstime();
sentinelEvent(LL_NOTICE,"+failover-state-send-slaveof-noone",
slave, "%@");
}
}
选出最佳从节点的逻辑在sentinelSelectSlave函数中,该函数会从主节点的所有从节点中选出最佳从节点:
#sentinel.c
sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
sentinelRedisInstance **instance =
zmalloc(sizeof(instance[0])*dictSize(master->slaves));
sentinelRedisInstance *selected = NULL;
int instances = 0;
dictIterator *di;
dictEntry *de;
mstime_t max_master_down_time = 0;
// 计算允许主从节点断开连接的最大时长
if (master->flags & SRI_S_DOWN)
max_master_down_time += mstime() - master->s_down_since_time;
max_master_down_time += master->down_after_period * 10;
di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
mstime_t info_validity_time;
// 从节点客观下线或者主观下线
if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
// 从节点断开连接
if (slave->link->disconnected) continue;
// 从节点距离上次回复PING命令超过SENTINEL_PING_PERIOD*5
if (mstime() - slave->link->last_avail_time > SENTINEL_PING_PERIOD*5) continue;
// 从节点优先级为0
if (slave->slave_priority == 0) continue;
// 如果主节点主观下线,允许从节点回复INFO命令的最大时长为SENTINEL_PING_PERIOD*5
if (master->flags & SRI_S_DOWN)
info_validity_time = SENTINEL_PING_PERIOD*5;
else
// 如果主节点客观下线,允许从节点回复INFO命令的最大时长为SENTINEL_INFO_PERIOD*3
info_validity_time = SENTINEL_INFO_PERIOD*3;
// 从节点回复INFO命令时长超过允许范围
if (mstime() - slave->info_refresh > info_validity_time) continue;
// 从节点与主节点断开连接时长超过max_master_down_time
if (slave->master_link_down_time > max_master_down_time) continue;
instance[instances++] = slave;
}
dictReleaseIterator(di);
if (instances) {
// 对从节点进行排序,选出一个从节点,排序规则是优先选择优先级高的、复制偏移量大的、运行ID最小的
qsort(instance,instances,sizeof(sentinelRedisInstance*),
compareSlavesForPromotion);
selected = instance[0];
}
zfree(instance);
return selected;
}
SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
当故障切换状态为SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE时,会调用sentinelFailoverSendSlaveOfNoOne函数,该函数内部首先会检查连接是否断开,如果连接断开则直接返回,如果超时则会中止本次故障切换。紧接着会调用sentinelSendSlaveOf
函数,在该函数中会以事物的形式执行SLAVEOF
命令、CONFIG REWRITE
命令、CLIENT KILL TYPE
命令。如果sentinelSendSlaveOf
函数返回成功,则会将故障切换状态更新为SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
#sentinel.c
void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
int retval;
// 选中的从节点断开连接,则直接返回。如果超时则中止本次故障切换
if (ri->promoted_slave->link->disconnected) {
if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
sentinelAbortFailover(ri);
}
return;
}
// 发送SLAVEOF NO ONE 命令
retval = sentinelSendSlaveOf(ri->promoted_slave,NULL);
if (retval != C_OK) return;
// 发送+failover-state-wait-promotion事件
sentinelEvent(LL_NOTICE, "+failover-state-wait-promotion",
ri->promoted_slave,"%@");
// 更新状态
ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
// 更新故障切换状态变更时间
ri->failover_state_change_time = mstime();
}
sentinelSendSlaveOf函数会以事物的形式向从节点分别发送SLAVEOF NO ONE
、CONFIG REWRITE
、CLIENT KILL TYPE
命令,SLAVEOF NO ONE
命令用来将从节点晋升为主节点,CONFIG REWRITE
命令会重写配置文件,CLIENT KILL TYPE
命令会将所有与从节点相连的客户端断开连接。
#sentinel.c
int sentinelSendSlaveOf(sentinelRedisInstance *ri, const sentinelAddr *addr) {
char portstr[32];
const char *host;
int retval;
// 如果host为NULL则发送SLAVEOF NO ONE命令,否则发送SLAVEOF IP 端口 命令
if (!addr) {
host = "NO";
memcpy(portstr,"ONE",4);
} else {
host = announceSentinelAddr(addr);
ll2string(portstr,sizeof(portstr),addr->port);
}
// 发送MULTI命令
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"MULTI"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;
// 发送SLAVEOF命令
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s %s %s",
sentinelInstanceMapCommand(ri,"SLAVEOF"),
host, portstr);
if (retval == C_ERR) return retval;
ri->link->pending_commands++;
// 发送CONFIG REWRITE命令
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s REWRITE",
sentinelInstanceMapCommand(ri,"CONFIG"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;
for (int type = 0; type < 2; type++) {
// 发送CLIENT KILL TYPE命令
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s KILL TYPE %s",
sentinelInstanceMapCommand(ri,"CLIENT"),
type == 0 ? "normal" : "pubsub");
if (retval == C_ERR) return retval;
ri->link->pending_commands++;
}
// 发送EXEC命令
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"EXEC"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;
return C_OK;
}
sentinelFailoverWaitPromotion:
当故障切换状态为SENTINEL_FAILOVER_STATE_WAIT_PROMOTION时,会调用sentinelFailoverWaitPromotion函数对从节点晋升结果进行判断,如果故障切换状态更新时间距离当前时间超过failover_timeout,则故障切换失败,中止本次故障切换。
#sentinel.c
void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
/* Just handle the timeout. Switching to the next state is handled
* by the function parsing the INFO command of the promoted slave. */
// 故障切换状态更新时间超时
if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
// 发送-failover-abort-slave-timeout事件
sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
// 中止故障切换
sentinelAbortFailover(ri);
}
}
哨兵会定时向主从实例发送INFO命令,并根据命令回复获取实例的最新信息。当收到晋升中的从节点的命令回复时,会判断其是否成功晋升为主节点,处理逻辑在INFO命令回调函数sentinelInfoReplyCallback中的sentinelRefreshInstanceInfo函数中:
#sentinel.c
void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
...
// 实例原本是从节点并且当前角色是主节点
if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
// 如果实例是故障切换中选中的从节点,并且处于故障切换中,故障切换状态为SENTINEL_FAILOVER_STATE_WAIT_PROMOTION
if ((ri->flags & SRI_PROMOTED) &&
(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
(ri->master->failover_state ==
SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
{
// 更新哨兵实例中的主节点的config_epoch属性
ri->master->config_epoch = ri->master->failover_epoch;
// 更新故障切换状态为SENTINEL_FAILOVER_STATE_RECONF_SLAVES
ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
// 更新故障切换状态变更时间
ri->master->failover_state_change_time = mstime();
// 更新配置文件
sentinelFlushConfig();
// 发送+promoted-slave事件
sentinelEvent(LL_WARNING,"+promoted-slave",ri,"%@");
if (sentinel.simfailure_flags &
SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION)
sentinelSimFailureCrash();
sentinelEvent(LL_WARNING,"+failover-state-reconf-slaves",
ri->master,"%@");
sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER,
"start",ri->master->addr,ri->addr);
// 强制更新所有实例的last_pub_time属性,以更快向频道__sentinel__:hello发布命令
sentinelForceHelloUpdateForMaster(ri->master);
}
}
...
}
SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
当故障切换状态为SENTINEL_FAILOVER_STATE_RECONF_SLAVES时,会调用sentinelFailoverReconfNextSlave函数向其他节点发送SLAVEOF <new master address>
命令,让他们与新的主节点进行同步:
SLAVEOF命令void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
dictIterator *di;
dictEntry *de;
int in_progress = 0;
// 统计正在同步的从节点
di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
in_progress++;
}
dictReleaseIterator(di);
di = dictGetIterator(master->slaves);
while(in_progress < master->parallel_syncs &&
(de = dictNext(di)) != NULL)
{
sentinelRedisInstance *slave = dictGetVal(de);
int retval;
// 如果是新主节点或者已经同步完成则跳过
if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
// 如果同步事件超过SENTINEL_SLAVE_RECONF_TIMEOUT,则设置为SRI_RECONF_DONE
if ((slave->flags & SRI_RECONF_SENT) &&
(mstime() - slave->slave_reconf_sent_time) >
SENTINEL_SLAVE_RECONF_TIMEOUT)
{
sentinelEvent(LL_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
slave->flags &= ~SRI_RECONF_SENT;
slave->flags |= SRI_RECONF_DONE;
}
// 如果正在同步则跳过
if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)) continue;
// 如果断开连接则跳过
if (slave->link->disconnected) continue;
// 发送SLAVEOF命令
retval = sentinelSendSlaveOf(slave,master->promoted_slave->addr);
if (retval == C_OK) {
slave->flags |= SRI_RECONF_SENT;
slave->slave_reconf_sent_time = mstime();
sentinelEvent(LL_NOTICE,"+slave-reconf-sent",slave,"%@");
in_progress++;
}
}
dictReleaseIterator(di);
// 检查是否同步完成或者超时
sentinelFailoverDetectEnd(master);
}