redis sentinel 源码分析

概述

所有sentinel节点会订阅master的sentinel:hello通道来发现其他sentinel节点
所有sentinel节点周期性的调用info命令检查所有非sentinel的redis节点
所有sentinel会周期性的对非sentinel的redis主节点进行主观下线检查,以及询问其他sentinel该主节点是否离线
所有sentinel会周期性的检查非sentinel的redis主节点是否满足客观下线条件(超过一半以上的sentinel都认为他主观下线),则epoch+1,则进入等待failover状态
进入等待failover状态会尝试获得leader节点,如果不是自己就放弃failover操作,如果是自己则进入挑选从节点状态
进入挑选从节点状态后,选出一个从节点,然后进入slaveof noone状态
进入slaveof noone状态后发送slaveof noone给该从节点,然后进入等待提升状态,
在获取info时候检查到此非sentinel的redis从节点在等待提升状态,则写入配置文件,调用脚本,进入重新配置从节点状态
进入重新配置从节点状态后,向所有从节点发送slaveof <new master address>,然后进入更新配置状态
进入更新配置状态后,更新sentinel内存中非sentinel的redis主和从节点的信息并写入配置文件

sentinel 的leader节点当认为非sentinel的redis主节点客观下线后,则

前置代码

int main(int argc, char **argv) {
...
initServer()
...
}
void initServer(void) {
...
   if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }
...
}

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
...
    if (server.sentinel_mode) sentinelTimer();
...
}

void sentinelTimer(void) {
    sentinelCheckTiltCondition();
    sentinelHandleDictOfRedisInstances(sentinel.masters);
    sentinelRunPendingScripts();
    sentinelCollectTerminatedScripts();
    sentinelKillTimedoutScripts();

    server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ;
}

相关代码

void sentinelHandleDictOfRedisInstances(dict *instances) {
...
 di = dictGetIterator(instances);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);

        sentinelHandleRedisInstance(ri);
        if (ri->flags & SRI_MASTER) {
            sentinelHandleDictOfRedisInstances(ri->slaves);
            sentinelHandleDictOfRedisInstances(ri->sentinels);
            if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
                switch_to_promoted = ri;
            }
        }
    }
    if (switch_to_promoted)
        sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
    dictReleaseIterator(di);
}

        把重置旧主节点,修改为新提升的从节点,添加其他从节点

void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
...   
 sentinelResetMasterAndChangeAddress(master,ref->addr->ip,ref->addr->port);
...
    sentinelResetMaster(master,SENTINEL_RESET_NO_SENTINELS);
    oldaddr = master->addr;
    master->addr = newaddr;
    master->o_down_since_time = 0;
    master->s_down_since_time = 0;

for (j = 0; j < numslaves; j++) {
        sentinelRedisInstance *slave;

        slave = createSentinelRedisInstance(NULL,SRI_SLAVE,slaves[j]->ip,
                    slaves[j]->port, master->quorum, master);
        releaseSentinelAddr(slaves[j]);
        if (slave) sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
    }
    zfree(slaves);
}
int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) {


}
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
  重连
    sentinelReconnectInstance(ri);
发送命令
    sentinelSendPeriodicCommands(ri);
...
检查主观下线
    sentinelCheckSubjectivelyDown(ri);
  
    只针对主节点
    if (ri->flags & SRI_MASTER) {
        检查是否主观下线
        sentinelCheckObjectivelyDown(ri);
      是否需要开始failover
        if (sentinelStartFailoverIfNeeded(ri))
            sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
        sentinelFailoverStateMachine(ri);
        sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
    }

}

void sentinelReconnectInstance(sentinelRedisInstance *ri) {

...
重连
 if (link->cc == NULL) {
        link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
}

如果是master
if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) {
...
订阅这个通道
 retval = redisAsyncCommand(link->pc,
                sentinelReceiveHelloMessages, ri, "%s %s",
                sentinelInstanceMapCommand(ri,"SUBSCRIBE"),
                SENTINEL_HELLO_CHANNEL); 
...
}
...
}

#define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"

void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
...
   if ((ri->flags & SRI_SENTINEL) == 0 &&
        (ri->info_refresh == 0 ||
        (now - ri->info_refresh) > info_period))
    {
      发送info命令给主节点
        retval = redisAsyncCommand(ri->link->cc,
            sentinelInfoReplyCallback, ri, "%s",
            sentinelInstanceMapCommand(ri,"INFO"));
        if (retval == C_OK) ri->link->pending_commands++;
    }
...
如果离上一次发送超过SENTINEL_PUBLISH_PERIOD(2秒)则再次发送sentinel hello
    if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
        sentinelSendHello(ri);
    }
...
}

int sentinelSendHello(sentinelRedisInstance *ri) {
...
吧自己的信息发送到通道
snprintf(payload,sizeof(payload),
        "%s,%d,%s,%llu," /* Info about this sentinel. */
        "%s,%s,%d,%llu", /* Info about current master. */
        announce_ip, announce_port, sentinel.myid,
        (unsigned long long) sentinel.current_epoch,
        /* --- */
        master->name,master_addr->ip,master_addr->port,
        (unsigned long long) master->config_epoch);
    retval = redisAsyncCommand(ri->link->cc,
        sentinelPublishReplyCallback, ri, "%s %s %s",
        sentinelInstanceMapCommand(ri,"PUBLISH"),
        SENTINEL_HELLO_CHANNEL,payload);
...
}

void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
...

    if (r->type == REDIS_REPLY_STRING)
        sentinelRefreshInstanceInfo(ri,r->str);
}

刷新实例信息
void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
...
       如果有新的从节点,则新增并写入配置文件
            if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
                if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
                            atoi(port), ri->quorum, ri)) != NULL)
                {
                    sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
                    sentinelFlushConfig();
                }
            }

如果此节点是等待提升的节点
   if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
        /* If this is a promoted slave we can change state to the
         * failover state machine. */
        if ((ri->flags & SRI_PROMOTED) &&
            (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
            (ri->master->failover_state ==
                SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
        {
...
设置重新配置从节点状态
            ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;

...

}
...
}

检查是否满足主观下线
void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
...
    if (elapsed > ri->down_after_period ||
        (ri->flags & SRI_MASTER &&
         ri->role_reported == SRI_SLAVE &&
         mstime() - ri->role_reported_time >
          (ri->down_after_period+SENTINEL_INFO_PERIOD*2)))
    {
        if ((ri->flags & SRI_S_DOWN) == 0) {
            sentinelEvent(LL_WARNING,"+sdown",ri,"%@");
            ri->s_down_since_time = mstime();
            ri->flags |= SRI_S_DOWN;
        }

...
}

void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
...
   if (master->flags & SRI_S_DOWN) {
        quorum = 1; 当前sentinel
        di = dictGetIterator(master->sentinels);
    查看其他sentinel的关于这个实例的信息
        while((de = dictNext(di)) != NULL) {
            sentinelRedisInstance *ri = dictGetVal(de);
            如果这个sentinel认为这个节点离线,则多数派+1
            if (ri->flags & SRI_MASTER_DOWN) quorum++;
        }
        dictReleaseIterator(di);
达到多数派条件,客观下线
        if (quorum >= master->quorum) odown = 1;
    }
...

}

检查是否需要进行failover
int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
...
 主节点不在客观下线状态

if (!(master->flags & SRI_O_DOWN)) return 0;

 有在进行中的failover
    if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0;

最近没有已尝试的failover
 if (mstime() - master->failover_start_time <
        master->failover_timeout*2)
    {
...
}
    sentinelStartFailover(master);

...
}
void sentinelStartFailover(sentinelRedisInstance *master) {
...
failover等待开始状态
    master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
...
}

void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
...
    
    如果这个sentinel不认为这个主节点客观下线
        if ((master->flags & SRI_S_DOWN) == 0) continue;
如果这个sentinel已断开连接
        if (ri->link->disconnected) continue;
如果我们没有在SENTINEL_ASK_PERIOD ms内收到他的信息
        if (!(flags & SENTINEL_ASK_FORCED) &&
            mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
            continue;

询问满足上述条件的其他sentinel这个主节点是否下线
ll2string(port,sizeof(port),master->addr->port);
        retval = redisAsyncCommand(ri->link->cc,
                    sentinelReceiveIsMasterDownReply, ri,
                    "%s is-master-down-by-addr %s %s %llu %s",
                    sentinelInstanceMapCommand(ri,"SENTINEL"),
                    master->addr->ip, port,
                    sentinel.current_epoch,
                    (master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
                    sentinel.myid : "*");
        if (retval == C_OK) ri->link->pending_commands++;
  }
}

更新sentinel failover状态机
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
    serverAssert(ri->flags & SRI_MASTER);

    if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;

    switch(ri->failover_state) {
        case SENTINEL_FAILOVER_STATE_WAIT_START:
            sentinelFailoverWaitStart(ri);
            break;
        case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
            sentinelFailoverSelectSlave(ri);
            break;
        case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
            sentinelFailoverSendSlaveOfNoOne(ri);
            break;
        case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
            sentinelFailoverWaitPromotion(ri);
            break;
        case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
            sentinelFailoverReconfNextSlave(ri);
            break;
    }
}

void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {

}

failover状态机

void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
...
检查我们是否还是leader
    leader = sentinelGetLeader(ri, ri->failover_epoch);

非leader并且是非强制failover则跳过
    if (!isleader && !(ri->flags & SRI_FORCE_FAILOVER)) {
...
}
进入挑选从节点状态
    ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;

}

void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
挑选这个主节点的从节点
 sentinelRedisInstance *slave = sentinelSelectSlave(ri);

设置状态,让选出的从节点执行slaveof none
        ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
    
}

sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
...
 mstime_t max_master_down_time = 0;
如果主节点是主观下线,则加主观下线到现在距离的时间
 if (master->flags & SRI_S_DOWN)
        max_master_down_time += mstime() - master->s_down_since_time;
max_master_down_time加master.down_after_period(就是配置中的down-after-milliseconds)的10倍
 max_master_down_time += master->down_after_period * 10;

获取所有满足条件的从节点
 while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *slave = dictGetVal(de);
        mstime_t info_validity_time;
    如果从节点客观或者主观下线则跳过
        if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
如果从节点已断开则跳过
        if (slave->link->disconnected) continue;
如果从节点上一次可用距现在已超过SENTINEL_PING_PERIOD的5倍(一共5秒)则跳过
        if (mstime() - slave->link->last_avail_time > SENTINEL_PING_PERIOD*5) continue;
如果从节点的优先级是0则跳过
        if (slave->slave_priority == 0) continue;

    如果主节点主观下线info_validity_time是SENTINEL_PING_PERIOD的3倍,否则5倍

        if (master->flags & SRI_S_DOWN)
            info_validity_time = SENTINEL_PING_PERIOD*5;
        else
            info_validity_time = SENTINEL_INFO_PERIOD*3;
如果从节点的最新信息距离现在超过info_validity_time则跳过
        if (mstime() - slave->info_refresh > info_validity_time) continue;
如果从节点和主节点的链接已断开时间超过max_master_down_time则跳过
        if (slave->master_link_down_time > max_master_down_time) continue;
满足跳过的实例加入
        instance[instances++] = slave;
    }
...
排序后选择第一个
if (instances) {
        qsort(instance,instances,sizeof(sentinelRedisInstance*),
            compareSlavesForPromotion);
        selected = instance[0];
    }
}


排序算法:
1 更低的slave_priority
2 更大的replication offset
3  runid在字典序上更小
int compareSlavesForPromotion(const void *a, const void *b) {
}

void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
发送slaveof noone ,rewrite config等命令
    retval = sentinelSendSlaveOf(ri->promoted_slave,NULL,0);

进入等待提升状态
    ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;

}
int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) {
...
    retval = redisAsyncCommand(ri->link->cc,
        sentinelDiscardReplyCallback, ri, "%s %s %s",
        sentinelInstanceMapCommand(ri,"SLAVEOF"),
        host, portstr);
    if (retval == C_ERR) return retval;
...
}

void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
...
让从节点从新的主节点复制
  retval = sentinelSendSlaveOf(slave,
                master->promoted_slave->addr->ip,
                master->promoted_slave->addr->port);
...
}

leader相关

void sentinelCommand(client *c) {
...
 else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
...
      if (ri && ri->flags & SRI_MASTER && strcasecmp(c->argv[5]->ptr,"*")) {
            leader = sentinelVoteLeader(ri,(uint64_t)req_epoch,
                                            c->argv[5]->ptr,
                                            &leader_epoch);
        }
返回leader
        addReplyBulkCString(c, leader ? leader : "*");

...
}


char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch) {
如果请求的epoch大于当前sentinel的epoch,则更新当前sentinel的epoch
if (req_epoch > sentinel.current_epoch) {
        sentinel.current_epoch = req_epoch;
        
    }

如果主节点的leader_epoch小于请求的epoch而且当前sentinel的epoch小于请求的epoch,则更新主节点的leader_epoch
    if (master->leader_epoch < req_epoch && sentinel.current_epoch <= req_epoch)
    {
设置主节点的 leader为请求的sentinel节点
        master->leader = sdsnew(req_runid);
        master->leader_epoch = sentinel.current_epoch;
      如果主节点的leader就是当前sentinel,设置主节点的failover_start_time
        if (strcasecmp(master->leader,sentinel.myid))
            master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
    }

    *leader_epoch = master->leader_epoch;
    return master->leader ? sdsnew(master->leader) : NULL;
}
}

char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) {
...
获取其他sentinel的投票
    di = dictGetIterator(master->sentinels);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);
        if (ri->leader != NULL && ri->leader_epoch == sentinel.current_epoch)
            sentinelLeaderIncr(counters,ri->leader);
    }
...
获得最大的投票数的sentinel
 di = dictGetIterator(counters);
    while((de = dictNext(di)) != NULL) {
        uint64_t votes = dictGetUnsignedIntegerVal(de);

        if (votes > max_votes) {
            max_votes = votes;
            winner = dictGetKey(de);
        }
    }
...
是否达成多数派
 voters_quorum = voters/2+1;
    if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
        winner = NULL;
...
}

当开始failover时,epoch会增加
void sentinelStartFailover(sentinelRedisInstance *master) {

    master->failover_epoch = ++sentinel.current_epoch;
}

sentinel获取其他sentinel节点相关

void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
...
    sentinelProcessHelloMessage(r->element[2]->str, r->element[2]->len);

}
void sentinelProcessHelloMessage(char *hello, int hello_len) {

...
添加sentinel节点
            si = createSentinelRedisInstance(token[2],SRI_SENTINEL,
                            token[0],port,master->quorum,master);
...
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,701评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,649评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,037评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,994评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,018评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,796评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,481评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,370评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,868评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,014评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,153评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,832评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,494评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,039评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,156评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,437评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,131评论 2 356

推荐阅读更多精彩内容

  • 故障转移 接着上章构建的sentinel网络构建后分析sentinel的故障转移。sentinel本身做为redi...
    ben1988阅读 3,406评论 1 0
  • Redis Sentinel 介绍与部署 1. Sentinel介绍 1.1 主从复制的问题 Redis主从复制可...
    56c60a7e3495阅读 1,020评论 0 1
  • 因为Redis拥有诸多优秀的特性,使用范围越来越广,系统对其可用性的依赖也越来越重,当前绝大部分系统使用的Redi...
    十毛tenmao阅读 414评论 0 1
  • 她永远记得,十七岁高中那年的午后。春光明媚。那个白衫飘飘的少年径直跑向在学校长廊看书的她,坚定的对...
    未生纸鸢阅读 291评论 0 0
  • 弟弟进步显而易见。 1 单单应变能力的变化就让我欢喜不已。 傍晚时分,抱着弟弟的我和邻居同辈妈妈在聊天,弟弟爸爸吃...
    师爷是枚美男子阅读 344评论 0 0