本文主要研究AOF持久化策略的实现方式,了解AOF数据组织方式和运作机制。。
建议阅读:
1、Redis AOF持久化方式的理论说明见:Redis之AOF持久化小探
I、上帝视角
为了更好的理解AOF持久化,我们首先将其拆分为文件追加,文件写入,文件同步三个部分:
1.1 文件追加
struct redisServer
结构中维护了一个AOF缓冲区,服务器在执行完一个写命令后,会以协议的格式调用feedAppendOnlyFile
将命令追加到服务器状态的aof_buf
中(这个缓冲区是在内存中):
/*src/redis.h*/
struct redisServer {
...
//AOF缓冲区
sds aof_buf;
};
1.2 文件写入与同步
1、要理解文件写入与同步,首先我们要理解缓存机制:
为了提高文件的写入效率,在现代操作系统中,当用户调用write
函数,将一些数据写入到文件的时候,操作系统通常会将写入数据暂时保存在一个内存缓冲区里面,等到内存缓冲区的空间被填满,或者超过了指定的时限后,才真正的将缓冲区中的数据写入到磁盘里面,这里的内存缓冲区就是1.1中说明的aof_buf
。
2、因为服务器在处理文件事件时可能会执行写命令,使得一些内容被追加到aof_buf
中,所以,服务器每次结束一个事件循环之前都会调用flushAppendOnlyFile
,判断是否需要将aof_buf
缓冲区的内容写入和保存到AOF文件中。下面将重点说明flushAppendOnlyFile
函数。
II、命令传播
propagate()
函数的作用是将命令传播给AOF以及slave中,对于slave是Redis集群部分的内容,我们暂且不说。而propagate()
将命令传播到AOF中是通过调用feedAppendOnlyFile()
函数:
/*命令传播到AOF和slave*/
/*src/redis.c/propagate*/
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
// AOF 策略需要打开,且设置AOF 传播标记,将更新发布给本地文件
if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
// 设置了从机传播标记,将更新发布给从机
if (flags & REDIS_PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
}
下面看被调用的feedAppendOnlyFile
函数做了什么:
/*命令追加至aof_buf缓冲区,如果正在执行重写功能,则也追加到重写缓冲区中*/
/*src/aof.c/feedAppendOnlyFile*/
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
sds buf = sdsempty();
robj *tmpargv[3];
/* The DB this command was targeting is not the same as the last command
* we appendend. To issue a SELECT command is needed.
*
* 使用 SELECT 命令,显式设置数据库,确保之后的命令被设置到正确的数据库
*/
if (dictid != server.aof_selected_db) {
char seldb[64];
snprintf(seldb,sizeof(seldb),"%d",dictid);
buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
(unsigned long)strlen(seldb),seldb);
server.aof_selected_db = dictid;
}
// EXPIRE 、 PEXPIRE 和 EXPIREAT 命令
if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
cmd->proc == expireatCommand) {
/* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT
*
* 将 EXPIRE 、 PEXPIRE 和 EXPIREAT 都翻译成 PEXPIREAT
*/
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
// SETEX 和 PSETEX 命令
} else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
/* Translate SETEX/PSETEX to SET and PEXPIREAT
*
* 将两个命令都翻译成 SET 和 PEXPIREAT
*/
// SET
tmpargv[0] = createStringObject("SET",3);
tmpargv[1] = argv[1];
tmpargv[2] = argv[3];
buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
// PEXPIREAT
decrRefCount(tmpargv[0]);
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
// 其他命令
} else {
/* All the other commands don't need translation or need the
* same translation already operated in the command vector
* for the replication itself. */
buf = catAppendOnlyGenericCommand(buf,argc,argv);
}
/* Append to the AOF buffer. This will be flushed on disk just before
* of re-entering the event loop, so before the client will get a
* positive reply about the operation performed.
*
* 将命令追加到 AOF 缓存中,
* 在重新进入事件循环之前,这些命令会被冲洗到磁盘上,
* 并向客户端返回一个回复。
*/
if (server.aof_state == REDIS_AOF_ON)
server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
/* If a background append only file rewriting is in progress we want to
* accumulate the differences between the child DB and the current one
* in a buffer, so that when the child process will do its work we
* can append the differences to the new append only file.
*
* 如果 BGREWRITEAOF 正在进行,
* 那么我们还需要将命令追加到重写缓存中,
* 从而记录当前正在重写的 AOF 文件和数据库当前状态的差异。
*/
if (server.aof_child_pid != -1)
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
// 释放
sdsfree(buf);
}
注释中有一句很重要的话就是:将命令追加大AOF缓存中,在重新进入事件循环之前,这些命令会被冲洗到磁盘上,并向客户端返回一个回复,注意给客户端返回回复的时间,这是数据库常见的备份操作,需要先将数据备份,在返回给客户端回复,如果反过来先给客户端回复,此时数据库down掉,则会出现不一致的情况。
III、同步磁盘
同步即将所有累积的aof_buf
中的内容写入到磁盘中,由flushAppendOnlyFile()
函数完成:
// 同步磁盘;将所有累积的更新server.aof_buf 写入磁盘
/*src/aof.c/flushAppendOnlyFile*/
/* Write the append only file buffer on disk.
*
* 将 AOF 缓存写入到文件中。
*
* Since we are required to write the AOF before replying to the client,
* and the only way the client socket can get a write is entering when the
* the event loop, we accumulate all the AOF writes in a memory
* buffer and write it on disk using this function just before entering
* the event loop again.
*
* 因为程序需要在回复客户端之前对 AOF 执行写操作。
* 而客户端能执行写操作的唯一机会就是在事件 loop 中,
* 因此,程序将所有 AOF 写累积到缓存中,
* 并在重新进入事件 loop 之前,将缓存写入到文件中。
*
* About the 'force' argument:
*
* 关于 force 参数:
*
* When the fsync policy is set to 'everysec' we may delay the flush if there
* is still an fsync() going on in the background thread, since for instance
* on Linux write(2) will be blocked by the background fsync anyway.
*
* 当 fsync 策略为每秒钟保存一次时,如果后台线程仍然有 fsync 在执行,
* 那么我们可能会延迟执行冲洗(flush)操作,
* 因为 Linux 上的 write(2) 会被后台的 fsync 阻塞。
*
* When this happens we remember that there is some aof buffer to be
* flushed ASAP, and will try to do that in the serverCron() function.
*
* 当这种情况发生时,说明需要尽快冲洗 aof 缓存,
* 程序会尝试在 serverCron() 函数中对缓存进行冲洗。
*
* However if force is set to 1 we'll write regardless of the background
* fsync.
*
* 不过,如果 force 为 1 的话,那么不管后台是否正在 fsync ,
* 程序都直接进行写入。
*/
#define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */
void flushAppendOnlyFile(int force) {
ssize_t nwritten;
int sync_in_progress = 0;
// 缓冲区中没有任何内容,直接返回
if (sdslen(server.aof_buf) == 0) return;
// 策略为每秒 FSYNC
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
// 是否有 SYNC 正在后台进行?
sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;
// 每秒 fsync ,并且强制写入为假
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
/* With this append fsync policy we do background fsyncing.
*
* 当 fsync 策略为每秒钟一次时, fsync 在后台执行。
*
* If the fsync is still in progress we can try to delay
* the write for a couple of seconds.
*
* 如果后台仍在执行 FSYNC ,那么我们可以延迟写操作一两秒
* (如果强制执行 write 的话,服务器主线程将阻塞在 write 上面)
*/
if (sync_in_progress) {
// 有 fsync 正在后台进行 。。。
if (server.aof_flush_postponed_start == 0) {
/* No previous write postponinig, remember that we are
* postponing the flush and return.
*
* 前面没有推迟过 write 操作,这里将推迟写操作的时间记录下来
* 然后就返回,不执行 write 或者 fsync
*/
server.aof_flush_postponed_start = server.unixtime;
return;
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
/* We were already waiting for fsync to finish, but for less
* than two seconds this is still ok. Postpone again.
*
* 如果之前已经因为 fsync 而推迟了 write 操作
* 但是推迟的时间不超过 2 秒,那么直接返回
* 不执行 write 或者 fsync
*/
return;
}
/* Otherwise fall trough, and go write since we can't wait
* over two seconds.
*
* 如果后台还有 fsync 在执行,并且 write 已经推迟 >= 2 秒
* 那么执行写操作(write 将被阻塞)
*/
server.aof_delayed_fsync++;
redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
}
}
/* If you are following this code path, then we are going to write so
* set reset the postponed flush sentinel to zero.
*
* 执行到这里,程序会对 AOF 文件进行写入。
*
* 清零延迟 write 的时间记录
*/
server.aof_flush_postponed_start = 0;
/* We want to perform a single write. This should be guaranteed atomic
* at least if the filesystem we are writing is a real physical one.
*
* 执行单个 write 操作,如果写入设备是物理的话,那么这个操作应该是原子的
*
* While this will save us against the server being killed I don't think
* there is much to do about the whole server stopping for power problems
* or alike
*
* 当然,如果出现像电源中断这样的不可抗现象,那么 AOF 文件也是可能会出现问题的
* 这时就要用 redis-check-aof 程序来进行修复。
*/
nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
if (nwritten != (signed)sdslen(server.aof_buf)) {
static time_t last_write_error_log = 0;
int can_log = 0;
/* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
// 将日志的记录频率限制在每行 AOF_WRITE_LOG_ERROR_RATE 秒
if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
can_log = 1;
last_write_error_log = server.unixtime;
}
/* Lof the AOF write error and record the error code. */
// 如果写入出错,那么尝试将该情况写入到日志里面
if (nwritten == -1) {
if (can_log) {
redisLog(REDIS_WARNING,"Error writing to the AOF file: %s",
strerror(errno));
server.aof_last_write_errno = errno;
}
} else {
if (can_log) {
redisLog(REDIS_WARNING,"Short write while writing to "
"the AOF file: (nwritten=%lld, "
"expected=%lld)",
(long long)nwritten,
(long long)sdslen(server.aof_buf));
}
// 尝试移除新追加的不完整内容
if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
if (can_log) {
redisLog(REDIS_WARNING, "Could not remove short write "
"from the append-only file. Redis may refuse "
"to load the AOF the next time it starts. "
"ftruncate: %s", strerror(errno));
}
} else {
/* If the ftrunacate() succeeded we can set nwritten to
* -1 since there is no longer partial data into the AOF. */
nwritten = -1;
}
server.aof_last_write_errno = ENOSPC;
}
/* Handle the AOF write error. */
// 处理写入 AOF 文件时出现的错误
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* We can't recover when the fsync policy is ALWAYS since the
* reply for the client is already in the output buffers, and we
* have the contract with the user that on acknowledged write data
* is synched on disk. */
redisLog(REDIS_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
exit(1);
} else {
/* Recover from failed write leaving data into the buffer. However
* set an error to stop accepting writes as long as the error
* condition is not cleared. */
server.aof_last_write_status = REDIS_ERR;
/* Trim the sds buffer if there was a partial write, and there
* was no way to undo it with ftruncate(2). */
if (nwritten > 0) {
server.aof_current_size += nwritten;
sdsrange(server.aof_buf,nwritten,-1);
}
return; /* We'll try again on the next call... */
}
} else {
/* Successful write(2). If AOF was in error state, restore the
* OK state and log the event. */
// 写入成功,更新最后写入状态
if (server.aof_last_write_status == REDIS_ERR) {
redisLog(REDIS_WARNING,
"AOF write error looks solved, Redis can write again.");
server.aof_last_write_status = REDIS_OK;
}
}
// 更新写入后的 AOF 文件大小
server.aof_current_size += nwritten;
/* Re-use AOF buffer when it is small enough. The maximum comes from the
* arena size of 4k minus some overhead (but is otherwise arbitrary).
*
* 如果 AOF 缓存的大小足够小的话,那么重用这个缓存,
* 否则的话,释放 AOF 缓存。
*/
if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
// 清空缓存中的内容,等待重用
sdsclear(server.aof_buf);
} else {
// 释放缓存
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}
/* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
* children doing I/O in the background.
*
* 如果 no-appendfsync-on-rewrite 选项为开启状态,
* 并且有 BGSAVE 或者 BGREWRITEAOF 正在进行的话,
* 那么不执行 fsync
*/
if (server.aof_no_fsync_on_rewrite &&
(server.aof_child_pid != -1 || server.rdb_child_pid != -1))
return;
/* Perform the fsync if needed. */
// 总是执行 fsnyc
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* aof_fsync is defined as fdatasync() for Linux in order to avoid
* flushing metadata. */
aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
// 更新最后一次执行 fsnyc 的时间
server.aof_last_fsync = server.unixtime;
// 策略为每秒 fsnyc ,并且距离上次 fsync 已经超过 1 秒
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
// 放到后台执行
if (!sync_in_progress) aof_background_fsync(server.aof_fd);
// 更新最后一次执行 fsync 的时间
server.aof_last_fsync = server.unixtime;
}
// 其实上面无论执行 if 部分还是 else 部分都要更新 fsync 的时间
// 可以将代码挪到下面来
// server.aof_last_fsync = server.unixtime;
}
这里的重点是fsync
操作,这是Linux的I/O同步操作,系统会将自己缓存中的内容以后台的方式写到磁盘上。
IV、AOF重写
AOF后台子进程方式重写操作由rewriteAppendOnlyFileBackground
函数完成:
/*src/aof.c/rewriteAppendOnlyFileBackground*/
/* This is how rewriting of the append only file in background works:
*
* 以下是后台重写 AOF 文件(BGREWRITEAOF)的工作步骤:
*
* 1) The user calls BGREWRITEAOF
* 用户调用 BGREWRITEAOF
*
* 2) Redis calls this function, that forks():
* Redis 调用这个函数,它执行 fork() :
*
* 2a) the child rewrite the append only file in a temp file.
* 子进程在临时文件中对 AOF 文件进行重写
*
* 2b) the parent accumulates differences in server.aof_rewrite_buf.
* 父进程将新输入的写命令追加到 server.aof_rewrite_buf 中
*
* 3) When the child finished '2a' exists.
* 当步骤 2a 执行完之后,子进程结束
*
* 4) The parent will trap the exit code, if it's OK, will append the
* data accumulated into server.aof_rewrite_buf into the temp file, and
* finally will rename(2) the temp file in the actual file name.
* The the new file is reopened as the new append only file. Profit!
*
* 父进程会捕捉子进程的退出信号,
* 如果子进程的退出状态是 OK 的话,
* 那么父进程将新输入命令的缓存追加到临时文件,
* 然后使用 rename(2) 对临时文件改名,用它代替旧的 AOF 文件,
* 至此,后台 AOF 重写完成。
*/
int rewriteAppendOnlyFileBackground(void) {
pid_t childpid;
long long start;
// 已经有进程在进行 AOF 重写了
if (server.aof_child_pid != -1) return REDIS_ERR;
// 记录 fork 开始前的时间,计算 fork 耗时用
start = ustime();
if ((childpid = fork()) == 0) {
char tmpfile[256]; //临时文件
/* 子进程设置 */
// 关闭网络连接 fd
closeListeningSockets(0);
// 为进程设置名字,方便记认
redisSetProcTitle("redis-aof-rewrite");
// 创建临时文件,并进行 AOF 重写
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
/*rewriteAppendOnlyFile函数将命令写入到tmpfile中,下面说明*/
if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
size_t private_dirty = zmalloc_get_private_dirty();
if (private_dirty) {
redisLog(REDIS_NOTICE,
"AOF rewrite: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
// 发送重写成功信号
exitFromChild(0);
} else {
// 发送重写失败信号
exitFromChild(1);
}
} else {
/* Parent */
// 记录执行 fork 所消耗的时间
server.stat_fork_time = ustime()-start;
if (childpid == -1) {
redisLog(REDIS_WARNING,
"Can't rewrite append only file in background: fork: %s",
strerror(errno));
return REDIS_ERR;
}
redisLog(REDIS_NOTICE,
"Background append only file rewriting started by pid %d",childpid);
// 记录 AOF 重写的信息
server.aof_rewrite_scheduled = 0;
server.aof_rewrite_time_start = time(NULL);
server.aof_child_pid = childpid;
// 关闭字典自动 rehash
updateDictResizePolicy();
/* We set appendseldb to -1 in order to force the next call to the
* feedAppendOnlyFile() to issue a SELECT command, so the differences
* accumulated by the parent into server.aof_rewrite_buf will start
* with a SELECT statement and it will be safe to merge.
*
* 将 aof_selected_db 设为 -1 ,
* 强制让 feedAppendOnlyFile() 下次执行时引发一个 SELECT 命令,
* 从而确保之后新添加的命令会设置到正确的数据库中
*/
server.aof_selected_db = -1;
replicationScriptCacheFlush();
return REDIS_OK;
}
return REDIS_OK; /* unreached */
}
可以看出rewriteAppendOnlyFileBackground
主要调用rewriteAppendOnlyFile
函数完成重写,下面看这个函数做了什么,也就是AOF持久化具体是怎么做的:
// AOF 持久化主函数。只在rewriteAppendOnlyFileBackground() 中会调用此函数
/*src/aof.c/rewriteAppendOnlyFile*/
/* Write a sequence of commands able to fully rebuild the dataset into
* "filename". Used both by REWRITEAOF and BGREWRITEAOF.
**
In order to minimize the number of commands needed in the rewritten
* log Redis uses variadic commands when possible, such as RPUSH, SADD
* and ZADD. However at max REDIS_AOF_REWRITE_ITEMS_PER_CMD items per time
* are inserted using a single command. */
/*将一集足以还原当前数据集的命令写入到 filename 指定的文件中*/
int rewriteAppendOnlyFile(char *filename) {
dictIterator *di = NULL;
dictEntry *de;
rio aof;
FILE *fp;
char tmpfile[256];
int j;
long long now = mstime();
/* Note that we have to use a different temp name here compared to the
* one used by rewriteAppendOnlyFileBackground() function. */
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
// 打开文件
fp = fopen(tmpfile,"w");
if (!fp) {
redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in"
"rewriteAppendOnlyFile(): %s", strerror(errno));
return REDIS_ERR;
}
// 初始化rio 结构体
rioInitWithFile(&aof,fp);
// 如果设置了自动备份参数,将进行设置
if (server.aof_rewrite_incremental_fsync)
rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES);
// 备份每一个数据集
for (j = 0; j < server.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
// 获取数据集的迭代器
di = dictGetSafeIterator(d);
if (!di) {
fclose(fp);
return REDIS_ERR;
}
// 写入AOF 操作码
/* SELECT the new DB */
if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
// 写入数据集序号
if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;
// 写入数据集中每一个数据项
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
sds keystr;
robj key, *o;
long long expiretime;
keystr = dictGetKey(de);
o = dictGetVal(de);
// 将keystr 封装在robj 里
initStaticStringObject(key,keystr);
// 获取过期时间
expiretime = getExpire(db,&key);
// 如果已经过期,放弃存储
/* If this key is already expired skip it */
if (expiretime != -1 && expiretime < now) continue;
// 写入键值对应的写操作
/* Save the key and associated value */
if (o->type == REDIS_STRING) {
/* Emit a SET command */
char cmd[]="*3\r\n$3\r\nSET\r\n";
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
/* Key and value */
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
if (rioWriteBulkObject(&aof,o) == 0) goto werr;
} else if (o->type == REDIS_LIST) {
if (rewriteListObject(&aof,&key,o) == 0) goto werr;
} else if (o->type == REDIS_SET) {
if (rewriteSetObject(&aof,&key,o) == 0) goto werr;
} else if (o->type == REDIS_ZSET) {
if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr;
} else if (o->type == REDIS_HASH) {
if (rewriteHashObject(&aof,&key,o) == 0) goto werr;
} else {
redisPanic("Unknown object type");
}
// 写入过期时间
/* Save the expire time */
if (expiretime != -1) {
char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
}
}
// 释放迭代器
dictReleaseIterator(di);
}
// 写入磁盘
/* Make sure data will not remain on the OS's output buffers */
fflush(fp);
aof_fsync(fileno(fp));
fclose(fp);
// 重写文件名
/* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok. */
if (rename(tmpfile,filename) == -1) {
redisLog(REDIS_WARNING,"Error moving temp append only file on the "
"final destination: %s", strerror(errno));
unlink(tmpfile);
return REDIS_ERR;
}
redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
return REDIS_OK;
werr:
// 清理工作
fclose(fp);
unlink(tmpfile);
redisLog(REDIS_WARNING,"Write error writing append only file on disk: "
"%s", strerror(errno));
if (di) dictReleaseIterator(di);
return REDIS_ERR;
}
在子进程完成AOF重写是,父进程会调用backgroundRewriteDoneHandler
函数,将重写缓冲区server.aof_rewrite_buf_blocks
中的内容写入磁盘:
/*src/aof.c/backgroundRewriteDoneHandler*/
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
......
// 将AOF 缓存server.aof_rewrite_buf_blocks 的AOF 写入磁盘
//这个函数调用write会阻塞主进程
if (aofRewriteBufferWrite(newfd) == -1) {
redisLog(REDIS_WARNING,
"Error trying to flush the parent diff to the rewritten AOF: %s",
strerror(errno));
close(newfd);
goto cleanup;
}
......
}
这其中主要又调用了aofRewriteBufferrWrite
函数,完成真正的追加操作:
/*src/aof.c/aofRewriteBufferWrite*/
// 将累积的更新缓存server.aof_rewrite_buf_blocks 同步到磁盘
/* Write the buffer (possibly composed of multiple blocks) into the specified
* fd. If no short write or any other error happens -1 is returned,
* otherwise the number of bytes written is returned. */
ssize_t aofRewriteBufferWrite(int fd) {
listNode *ln;
listIter li;
ssize_t count = 0;
listRewind(server.aof_rewrite_buf_blocks,&li);
//遍历所有缓存块
while((ln = listNext(&li))) {
aofrwblock *block = listNodeValue(ln);
ssize_t nwritten;
if (block->used) {
//写入缓存内容
nwritten = write(fd,block->buf,block->used);
if (nwritten != block->used) {
if (nwritten == 0) errno = EIO;
return -1;
}
count += nwritten;
}
}
return count;
}
V、AOF的恢复过程
以伪客户端的方式重新读入AOF文件以恢复内容主要在loadAppendOnlyFile
函数中实现:
// 加载AOF 文件,恢复数据
/**src/aof.c/loadAppendOnlyFile*/
/* Replay the append log file. On error REDIS_OK is returned. On non fatal
* error (the append only file is zero-length) REDIS_ERR is returned. On
* fatal error an error message is logged and the program exists. */
int loadAppendOnlyFile(char *filename) {
struct redisClient *fakeClient;
FILE *fp = fopen(filename,"r");
struct redis_stat sb;
int old_aof_state = server.aof_state;
long loops = 0;
// 文件大小不能为0
if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
server.aof_current_size = 0;
fclose(fp);
return REDIS_ERR;
}
if (fp == NULL) {
redisLog(REDIS_WARNING,"Fatal error: can't open the append log file "
"for reading: %s",strerror(errno));
exit(1);
}
// 正在执行AOF 加载操作,于是暂时禁止AOF 的所有操作,以免混淆
/* Temporarily disable AOF, to prevent EXEC from feeding a MULTI
* to the same file we're about to read. */
server.aof_state = REDIS_AOF_OFF;
// 虚拟出一个客户端,即redisClient
fakeClient = createFakeClient();
startLoading(fp);
while(1) {
int argc, j;
unsigned long len;
robj **argv;
char buf[128];
sds argsds;
struct redisCommand *cmd;
// 每循环1000 次,在恢复数据的同时,服务器也为客户端服务。
// aeProcessEvents() 会进入事件循环
/* Serve the clients from time to time */
if (!(loops++ % 1000)) {
loadingProgress(ftello(fp));
aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
}
// 可能aof 文件到了结尾
if (fgets(buf,sizeof(buf),fp) == NULL) {
if (feof(fp))
break;
else
goto readerr;
}
// 必须以“*”开头,格式不对,退出
if (buf[0] != '*') goto fmterr;
// 参数的个数
argc = atoi(buf+1);
// 参数个数错误
if (argc < 1) goto fmterr;
// 为参数分配空间
argv = zmalloc(sizeof(robj*)*argc);
// 依次读取参数
for (j = 0; j < argc; j++) {
if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
if (buf[0] != '$') goto fmterr;
len = strtol(buf+1,NULL,10);
argsds = sdsnewlen(NULL,len);
if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
argv[j] = createObject(REDIS_STRING,argsds);
if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
}
// 找到相应的命令
/* Command lookup */
cmd = lookupCommand(argv[0]->ptr);
if (!cmd) {
redisLog(REDIS_WARNING,"Unknown command '%s' reading the "
"append only file", (char*)argv[0]->ptr);
exit(1);
}
// 执行命令,模拟服务客户端请求的过程,从而写入数据
/* Run the command in the context of a fake client */
fakeClient->argc = argc;
fakeClient->argv = argv;
cmd->proc(fakeClient);
/* The fake client should not have a reply */
redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply)
== 0);
/* The fake client should never get blocked */
redisAssert((fakeClient->flags & REDIS_BLOCKED) == 0);
// 释放虚拟客户端空间
/* Clean up. Command code may have changed argv/argc so we use the
* argv/argc of the client instead of the local variables. */
for (j = 0; j < fakeClient->argc; j++)
decrRefCount(fakeClient->argv[j]);
zfree(fakeClient->argv);
}
/* This point can only be reached when EOF is reached without errors.
* If the client is in the middle of a MULTI/EXEC, log error and quit. */
if (fakeClient->flags & REDIS_MULTI) goto readerr;
// 清理工作
fclose(fp);
freeFakeClient(fakeClient);
// 恢复旧的AOF 状态
server.aof_state = old_aof_state;
stopLoading();
// 记录最近AOF 操作的文件大小
aofUpdateCurrentSize();
server.aof_rewrite_base_size = server.aof_current_size;
return REDIS_OK;
readerr:
// 错误,清理工作
if (feof(fp)) {
redisLog(REDIS_WARNING,"Unexpected end of file reading the append "
"only file");
} else {
redisLog(REDIS_WARNING,"Unrecoverable error reading the append only "
"file: %s", strerror(errno));
}
exit(1);
fmterr:
redisLog(REDIS_WARNING,"Bad file format reading the append only file: "
"make a backup of your AOF file, then use ./redis-check-aof --fix "
"<filename>");
exit(1);
}
【参考】
[1] 《Redis设计与实现》