本文主要说明Redis监视器的实现代码。
I、监视器理论部分
1、客户端通过执行MONITOR
命令可以将自己变为一个监视器,实时地接受并打印出服务器当前处理的命令请求的相关信息,如:
此时,当其他客户端向服务器发送一条命令请求时,服务器除了会处理这条命令请求之外,还会将这条命令请求的信息发送给所有监视器:
II、代码实现
2.1 成为监视器
redisServer
维护一个monitors
的链表,记录自己的监视器,每次收到MONITOR
命令之后,只需将客户端追加到表尾即可:
/*src/redis.c/monitorCommand*/
void monitorCommand(redisClient *c) {
/* ignore MONITOR if already slave or in monitor mode */
if (c->flags & REDIS_SLAVE) return;
c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
listAddNodeTail(server.monitors,c);
addReply(c,shared.ok); //回复OK
}
2.2 向监视器传播命令
call
函数中有对于监视器命令传播的实现:
// call() 函数是执行命令的核心函数,这里只看监视器部分
/*src/redis.c/call*/
/* Call() is the core of Redis execution of a command */
void call(redisClient *c, int flags) {
long long dirty, start = ustime(), duration;
int client_old_flags = c->flags;
/* Sent the command to clients in MONITOR mode, only if the commands are
* not generated from reading an AOF. */
if (listLength(server.monitors) &&
!server.loading &&
!(c->cmd->flags & REDIS_CMD_SKIP_MONITOR))
{
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}
......
}
call
主要调用了replicationFeedMonitors
,这个函数的作用就是将命令打包为协议,发送给监视器:
// 将协议发给 Monitor
/*src/replication.c/replicationFeedMonitors*/
void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int j;
sds cmdrepr = sdsnew("+");
robj *cmdobj;
struct timeval tv;
// 获取时间戳
gettimeofday(&tv,NULL);
cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec);
if (c->flags & REDIS_LUA_CLIENT) {
cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid);
} else if (c->flags & REDIS_UNIX_SOCKET) {
cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket);
} else {
cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c));
}
// 获取命令和参数
for (j = 0; j < argc; j++) {
if (argv[j]->encoding == REDIS_ENCODING_INT) {
cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr);
} else {
cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr,
sdslen(argv[j]->ptr));
}
if (j != argc-1)
cmdrepr = sdscatlen(cmdrepr," ",1);
}
cmdrepr = sdscatlen(cmdrepr,"\r\n",2);
cmdobj = createObject(REDIS_STRING,cmdrepr);
// 将内容发送给所有 MONITOR
listRewind(monitors,&li);
while((ln = listNext(&li))) {
redisClient *monitor = ln->value;
addReply(monitor,cmdobj);
}
decrRefCount(cmdobj);
}
【参考】
[1] 《Redis设计与实现》
[2] 《Redis源码日志》