系列
redis数据淘汰原理
redis过期数据删除策略
redis server事件模型
redis cluster mget 引发的讨论
redis 3.x windows 集群搭建
redis 命令执行过程
redis string底层数据结构
redis list底层数据结构
redis hash底层数据结构
redis set底层数据结构
redis zset底层数据结构
redis 客户端管理
redis 主从同步-slave端
redis 主从同步-master端
redis 主从超时检测
redis aof持久化
redis rdb持久化
redis 数据恢复过程
redis TTL实现原理
redis cluster集群建立
redis cluster集群选主
redis的主从超时检测主要从以下三个方面进行判断,分别是主监测从、从监测主、正常关闭。
- 主监测从:slave定期发送replconf ack offset命令到master来报告自己的存活状况
- 从监测主:master定期发送ping命令或者\n命令到slave来报告自己的存活状况
- 正常关闭:eventLoop监测端口关闭的nio事件
周期性发送心跳命令
定期执行函数serverCron内部会周期性的执行replicationCron方法,该方法内部执行的动作包括重连接主服务器、向主服务器发送 ACK 、判断数据发送失败情况、断开本服务器超时的从服务器,向从服务器发送PING或者\n命令。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
clientsCron();
// 对数据库执行各种操作
databasesCron();
// 重连接主服务器、向主服务器发送 ACK 、判断数据发送失败情况、断开本服务器超时的从服务器,等等
run_with_period(1000) replicationCron();
// 增加 loop 计数器
server.cronloops++;
return 1000/server.hz;
}
replicationCron内部做的事情就是周期性的发送心跳命令包括:
- slave发往master的replconf ack offset,用于主检测从的存活性
- master发往slave的PING命令,用于从检测主的存活性
- master发往slave的\n命令,用于从检测主的存活性
// 复制 cron 函数,每秒调用一次
void replicationCron(void) {
// 定期向主服务器发送 ACK 命令
if (server.masterhost && server.master &&
!(server.master->flags & REDIS_PRE_PSYNC))
replicationSendAck();
/*
* 如果服务器有从服务器,定时向它们发送 PING 。
*
* 这样从服务器就可以实现显式的 master 超时判断机制,
* 即使 TCP 连接未断开也是如此。
*/
if (!(server.cronloops % (server.repl_ping_slave_period * server.hz))) {
listIter li;
listNode *ln;
robj *ping_argv[1];
/* First, send PING */
// 向所有已连接 slave (状态为 ONLINE)发送 PING
ping_argv[0] = createStringObject("PING",4);
replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1);
decrRefCount(ping_argv[0]);
/*
* 向那些正在等待 RDB 文件的从服务器(状态为 BGSAVE_START 或 BGSAVE_END)
* 发送 "\n"
*
* 这个 "\n" 会被从服务器忽略,
* 它的作用就是用来防止主服务器因为长期不发送信息而被从服务器误判为超时
*/
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START ||
slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
if (write(slave->fd, "\n", 1) == -1) {
/* Don't worry, it's just a ping. */
}
}
}
}
}
主监测从
master收到slave的replconf命令的时候更新c->repl_ack_time,也就是代表收到slave发送ack命令的时间。
/* REPLCONF <option> <value> <option> <value> ...
* 由 slave 使用,在 SYNC 之前配置复制进程(process)
* 目前这个函数的唯一作用就是,让 slave 告诉 master 它正在监听的端口号
* 然后 master 就可以在 INFO 命令的输出中打印这个号码了。
* 将来可能会用这个命令来实现增量式复制,取代 full resync 。
*/
void replconfCommand(redisClient *c) {
for (j = 1; j < c->argc; j+=2) {
else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
// 更新最后一次发送 ack 的时间
c->repl_ack_time = server.unixtime;
return;
}
}
addReply(c,shared.ok);
}
master从判断当前时间和上一次ack时间来判断slave的存,(server.unixtime - slave->repl_ack_time) > server.repl_timeout。如果超时就释放和slave的连接。
// 复制 cron 函数,每秒调用一次
void replicationCron(void) {
// 断开超时从服务器
if (listLength(server.slaves)) {
listIter li;
listNode *ln;
// 遍历所有从服务器
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
// 释放超时从服务器
if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout)
{
char ip[REDIS_IP_STR_LEN];
int port;
if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) != -1) {
redisLog(REDIS_WARNING,
"Disconnecting timedout slave: %s:%d",
ip, slave->slave_listening_port);
}
// 释放
freeClient(slave);
}
}
}
从监测主
slave每次接收到master发送过来的命令的时候都会更新client的上一次交互时间也就是c->lastinteraction,这里的client c代表就是slave连接master的server.master的redis client对象。
/*
* 读取客户端的查询缓冲区内容
*/
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
// 读入内容到查询缓存
nread = read(fd, c->querybuf+qblen, readlen);
if (nread) {
// 记录服务器和客户端最后一次互动的时间
c->lastinteraction = server.unixtime;
}
// 从查询缓存重读取内容,创建参数,并执行命令
// 函数会执行到缓存中的所有内容都被处理完为止
processInputBuffer(c);
}
slave定期检查当前时间和上一次交互时间的差值是否大于最大超时时间:(time(NULL)-server.master->lastinteraction) > server.repl_timeout,如果超时就断开连接。
// 复制 cron 函数,每秒调用一次
void replicationCron(void) {
// 尝试连接到主服务器,但超时
if (server.masterhost &&
(server.repl_state == REDIS_REPL_CONNECTING ||
server.repl_state == REDIS_REPL_RECEIVE_PONG) &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
redisLog(REDIS_WARNING,"Timeout connecting to the MASTER...");
// 取消连接
undoConnectWithMaster();
}
// RDB 文件的传送已超时?
if (server.masterhost && server.repl_state == REDIS_REPL_TRANSFER &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
// 停止传送,并删除临时文件
replicationAbortSyncTransfer();
}
// 从服务器曾经连接上主服务器,但现在超时
if (server.masterhost && server.repl_state == REDIS_REPL_CONNECTED &&
(time(NULL)-server.master->lastinteraction) > server.repl_timeout)
{
redisLog(REDIS_WARNING,"MASTER timeout: no data nor PING received...");
// 释放主服务器
freeClient(server.master);
}
// 尝试连接主服务器
if (server.repl_state == REDIS_REPL_CONNECT) {
redisLog(REDIS_NOTICE,"Connecting to MASTER %s:%d",
server.masterhost, server.masterport);
if (connectWithMaster() == REDIS_OK) {
redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
}
}
}
正常关闭
判断socket正常的关闭的途径就是通过socket的read方法来判断:
- 读取报文数据出错
- 读取报文长度为0,这里需要解释下:
1. TCP recv返回0, 说明对方关闭;
2. 注册EPOLLERR, 收到事件是关闭;
3. recv/send 返回-1时, 如果错误不是EWOULDBLOCK或者EINTR, 也主动关闭连接。
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
// 读入内容到查询缓存
nread = read(fd, c->querybuf+qblen, readlen);
// 读入出错
if (nread == -1) {
if (errno == EAGAIN) {
nread = 0;
} else {
redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
freeClient(c);
return;
}
// 遇到 EOF
} else if (nread == 0) {
redisLog(REDIS_VERBOSE, "Client closed connection");
freeClient(c);
return;
}
}