概述
所有sentinel节点会订阅master的sentinel:hello通道来发现其他sentinel节点
所有sentinel节点周期性的调用info命令检查所有非sentinel的redis节点
所有sentinel会周期性的对非sentinel的redis主节点进行主观下线检查,以及询问其他sentinel该主节点是否离线
所有sentinel会周期性的检查非sentinel的redis主节点是否满足客观下线条件(超过一半以上的sentinel都认为他主观下线),则epoch+1,则进入等待failover状态
进入等待failover状态会尝试获得leader节点,如果不是自己就放弃failover操作,如果是自己则进入挑选从节点状态
进入挑选从节点状态后,选出一个从节点,然后进入slaveof noone状态
进入slaveof noone状态后发送slaveof noone给该从节点,然后进入等待提升状态,
在获取info时候检查到此非sentinel的redis从节点在等待提升状态,则写入配置文件,调用脚本,进入重新配置从节点状态
进入重新配置从节点状态后,向所有从节点发送slaveof <new master address>,然后进入更新配置状态
进入更新配置状态后,更新sentinel内存中非sentinel的redis主和从节点的信息并写入配置文件
sentinel 的leader节点当认为非sentinel的redis主节点客观下线后,则
前置代码
int main(int argc, char **argv) {
...
initServer()
...
}
void initServer(void) {
...
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
...
}
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
...
if (server.sentinel_mode) sentinelTimer();
...
}
void sentinelTimer(void) {
sentinelCheckTiltCondition();
sentinelHandleDictOfRedisInstances(sentinel.masters);
sentinelRunPendingScripts();
sentinelCollectTerminatedScripts();
sentinelKillTimedoutScripts();
server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ;
}
相关代码
void sentinelHandleDictOfRedisInstances(dict *instances) {
...
di = dictGetIterator(instances);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
sentinelHandleRedisInstance(ri);
if (ri->flags & SRI_MASTER) {
sentinelHandleDictOfRedisInstances(ri->slaves);
sentinelHandleDictOfRedisInstances(ri->sentinels);
if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
switch_to_promoted = ri;
}
}
}
if (switch_to_promoted)
sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
dictReleaseIterator(di);
}
把重置旧主节点,修改为新提升的从节点,添加其他从节点
void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
...
sentinelResetMasterAndChangeAddress(master,ref->addr->ip,ref->addr->port);
...
sentinelResetMaster(master,SENTINEL_RESET_NO_SENTINELS);
oldaddr = master->addr;
master->addr = newaddr;
master->o_down_since_time = 0;
master->s_down_since_time = 0;
for (j = 0; j < numslaves; j++) {
sentinelRedisInstance *slave;
slave = createSentinelRedisInstance(NULL,SRI_SLAVE,slaves[j]->ip,
slaves[j]->port, master->quorum, master);
releaseSentinelAddr(slaves[j]);
if (slave) sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
}
zfree(slaves);
}
int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) {
}
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
重连
sentinelReconnectInstance(ri);
发送命令
sentinelSendPeriodicCommands(ri);
...
检查主观下线
sentinelCheckSubjectivelyDown(ri);
只针对主节点
if (ri->flags & SRI_MASTER) {
检查是否主观下线
sentinelCheckObjectivelyDown(ri);
是否需要开始failover
if (sentinelStartFailoverIfNeeded(ri))
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
sentinelFailoverStateMachine(ri);
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
}
}
void sentinelReconnectInstance(sentinelRedisInstance *ri) {
...
重连
if (link->cc == NULL) {
link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
}
如果是master
if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) {
...
订阅这个通道
retval = redisAsyncCommand(link->pc,
sentinelReceiveHelloMessages, ri, "%s %s",
sentinelInstanceMapCommand(ri,"SUBSCRIBE"),
SENTINEL_HELLO_CHANNEL);
...
}
...
}
#define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"
void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
...
if ((ri->flags & SRI_SENTINEL) == 0 &&
(ri->info_refresh == 0 ||
(now - ri->info_refresh) > info_period))
{
发送info命令给主节点
retval = redisAsyncCommand(ri->link->cc,
sentinelInfoReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"INFO"));
if (retval == C_OK) ri->link->pending_commands++;
}
...
如果离上一次发送超过SENTINEL_PUBLISH_PERIOD(2秒)则再次发送sentinel hello
if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
sentinelSendHello(ri);
}
...
}
int sentinelSendHello(sentinelRedisInstance *ri) {
...
吧自己的信息发送到通道
snprintf(payload,sizeof(payload),
"%s,%d,%s,%llu," /* Info about this sentinel. */
"%s,%s,%d,%llu", /* Info about current master. */
announce_ip, announce_port, sentinel.myid,
(unsigned long long) sentinel.current_epoch,
/* --- */
master->name,master_addr->ip,master_addr->port,
(unsigned long long) master->config_epoch);
retval = redisAsyncCommand(ri->link->cc,
sentinelPublishReplyCallback, ri, "%s %s %s",
sentinelInstanceMapCommand(ri,"PUBLISH"),
SENTINEL_HELLO_CHANNEL,payload);
...
}
void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
...
if (r->type == REDIS_REPLY_STRING)
sentinelRefreshInstanceInfo(ri,r->str);
}
刷新实例信息
void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
...
如果有新的从节点,则新增并写入配置文件
if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
atoi(port), ri->quorum, ri)) != NULL)
{
sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
sentinelFlushConfig();
}
}
如果此节点是等待提升的节点
if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
/* If this is a promoted slave we can change state to the
* failover state machine. */
if ((ri->flags & SRI_PROMOTED) &&
(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
(ri->master->failover_state ==
SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
{
...
设置重新配置从节点状态
ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
...
}
...
}
检查是否满足主观下线
void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
...
if (elapsed > ri->down_after_period ||
(ri->flags & SRI_MASTER &&
ri->role_reported == SRI_SLAVE &&
mstime() - ri->role_reported_time >
(ri->down_after_period+SENTINEL_INFO_PERIOD*2)))
{
if ((ri->flags & SRI_S_DOWN) == 0) {
sentinelEvent(LL_WARNING,"+sdown",ri,"%@");
ri->s_down_since_time = mstime();
ri->flags |= SRI_S_DOWN;
}
...
}
void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
...
if (master->flags & SRI_S_DOWN) {
quorum = 1; 当前sentinel
di = dictGetIterator(master->sentinels);
查看其他sentinel的关于这个实例的信息
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
如果这个sentinel认为这个节点离线,则多数派+1
if (ri->flags & SRI_MASTER_DOWN) quorum++;
}
dictReleaseIterator(di);
达到多数派条件,客观下线
if (quorum >= master->quorum) odown = 1;
}
...
}
检查是否需要进行failover
int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
...
主节点不在客观下线状态
if (!(master->flags & SRI_O_DOWN)) return 0;
有在进行中的failover
if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0;
最近没有已尝试的failover
if (mstime() - master->failover_start_time <
master->failover_timeout*2)
{
...
}
sentinelStartFailover(master);
...
}
void sentinelStartFailover(sentinelRedisInstance *master) {
...
failover等待开始状态
master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
...
}
void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
...
如果这个sentinel不认为这个主节点客观下线
if ((master->flags & SRI_S_DOWN) == 0) continue;
如果这个sentinel已断开连接
if (ri->link->disconnected) continue;
如果我们没有在SENTINEL_ASK_PERIOD ms内收到他的信息
if (!(flags & SENTINEL_ASK_FORCED) &&
mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
continue;
询问满足上述条件的其他sentinel这个主节点是否下线
ll2string(port,sizeof(port),master->addr->port);
retval = redisAsyncCommand(ri->link->cc,
sentinelReceiveIsMasterDownReply, ri,
"%s is-master-down-by-addr %s %s %llu %s",
sentinelInstanceMapCommand(ri,"SENTINEL"),
master->addr->ip, port,
sentinel.current_epoch,
(master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
sentinel.myid : "*");
if (retval == C_OK) ri->link->pending_commands++;
}
}
更新sentinel failover状态机
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
serverAssert(ri->flags & SRI_MASTER);
if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
switch(ri->failover_state) {
case SENTINEL_FAILOVER_STATE_WAIT_START:
sentinelFailoverWaitStart(ri);
break;
case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
sentinelFailoverSelectSlave(ri);
break;
case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
sentinelFailoverSendSlaveOfNoOne(ri);
break;
case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
sentinelFailoverWaitPromotion(ri);
break;
case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
sentinelFailoverReconfNextSlave(ri);
break;
}
}
void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
}
failover状态机
void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
...
检查我们是否还是leader
leader = sentinelGetLeader(ri, ri->failover_epoch);
非leader并且是非强制failover则跳过
if (!isleader && !(ri->flags & SRI_FORCE_FAILOVER)) {
...
}
进入挑选从节点状态
ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
}
void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
挑选这个主节点的从节点
sentinelRedisInstance *slave = sentinelSelectSlave(ri);
设置状态,让选出的从节点执行slaveof none
ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
}
sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
...
mstime_t max_master_down_time = 0;
如果主节点是主观下线,则加主观下线到现在距离的时间
if (master->flags & SRI_S_DOWN)
max_master_down_time += mstime() - master->s_down_since_time;
max_master_down_time加master.down_after_period(就是配置中的down-after-milliseconds)的10倍
max_master_down_time += master->down_after_period * 10;
获取所有满足条件的从节点
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
mstime_t info_validity_time;
如果从节点客观或者主观下线则跳过
if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
如果从节点已断开则跳过
if (slave->link->disconnected) continue;
如果从节点上一次可用距现在已超过SENTINEL_PING_PERIOD的5倍(一共5秒)则跳过
if (mstime() - slave->link->last_avail_time > SENTINEL_PING_PERIOD*5) continue;
如果从节点的优先级是0则跳过
if (slave->slave_priority == 0) continue;
如果主节点主观下线info_validity_time是SENTINEL_PING_PERIOD的3倍,否则5倍
if (master->flags & SRI_S_DOWN)
info_validity_time = SENTINEL_PING_PERIOD*5;
else
info_validity_time = SENTINEL_INFO_PERIOD*3;
如果从节点的最新信息距离现在超过info_validity_time则跳过
if (mstime() - slave->info_refresh > info_validity_time) continue;
如果从节点和主节点的链接已断开时间超过max_master_down_time则跳过
if (slave->master_link_down_time > max_master_down_time) continue;
满足跳过的实例加入
instance[instances++] = slave;
}
...
排序后选择第一个
if (instances) {
qsort(instance,instances,sizeof(sentinelRedisInstance*),
compareSlavesForPromotion);
selected = instance[0];
}
}
排序算法:
1 更低的slave_priority
2 更大的replication offset
3 runid在字典序上更小
int compareSlavesForPromotion(const void *a, const void *b) {
}
void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
发送slaveof noone ,rewrite config等命令
retval = sentinelSendSlaveOf(ri->promoted_slave,NULL,0);
进入等待提升状态
ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
}
int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) {
...
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s %s %s",
sentinelInstanceMapCommand(ri,"SLAVEOF"),
host, portstr);
if (retval == C_ERR) return retval;
...
}
void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
...
让从节点从新的主节点复制
retval = sentinelSendSlaveOf(slave,
master->promoted_slave->addr->ip,
master->promoted_slave->addr->port);
...
}
leader相关
void sentinelCommand(client *c) {
...
else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
...
if (ri && ri->flags & SRI_MASTER && strcasecmp(c->argv[5]->ptr,"*")) {
leader = sentinelVoteLeader(ri,(uint64_t)req_epoch,
c->argv[5]->ptr,
&leader_epoch);
}
返回leader
addReplyBulkCString(c, leader ? leader : "*");
...
}
char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch) {
如果请求的epoch大于当前sentinel的epoch,则更新当前sentinel的epoch
if (req_epoch > sentinel.current_epoch) {
sentinel.current_epoch = req_epoch;
}
如果主节点的leader_epoch小于请求的epoch而且当前sentinel的epoch小于请求的epoch,则更新主节点的leader_epoch
if (master->leader_epoch < req_epoch && sentinel.current_epoch <= req_epoch)
{
设置主节点的 leader为请求的sentinel节点
master->leader = sdsnew(req_runid);
master->leader_epoch = sentinel.current_epoch;
如果主节点的leader就是当前sentinel,设置主节点的failover_start_time
if (strcasecmp(master->leader,sentinel.myid))
master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
}
*leader_epoch = master->leader_epoch;
return master->leader ? sdsnew(master->leader) : NULL;
}
}
char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) {
...
获取其他sentinel的投票
di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
if (ri->leader != NULL && ri->leader_epoch == sentinel.current_epoch)
sentinelLeaderIncr(counters,ri->leader);
}
...
获得最大的投票数的sentinel
di = dictGetIterator(counters);
while((de = dictNext(di)) != NULL) {
uint64_t votes = dictGetUnsignedIntegerVal(de);
if (votes > max_votes) {
max_votes = votes;
winner = dictGetKey(de);
}
}
...
是否达成多数派
voters_quorum = voters/2+1;
if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
winner = NULL;
...
}
当开始failover时,epoch会增加
void sentinelStartFailover(sentinelRedisInstance *master) {
master->failover_epoch = ++sentinel.current_epoch;
}
sentinel获取其他sentinel节点相关
void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
...
sentinelProcessHelloMessage(r->element[2]->str, r->element[2]->len);
}
void sentinelProcessHelloMessage(char *hello, int hello_len) {
...
添加sentinel节点
si = createSentinelRedisInstance(token[2],SRI_SENTINEL,
token[0],port,master->quorum,master);
...
}