Redis单线程模型,从源码角度解析

简述

大家都知道Redis是单线程的内存数据库,数据存储在内存中,且使用的是单线程模型,因此速度极快;今天我们就来从源码的角度分析下Redis的单线程模型;
首先,我们这里使用的是Redis2.9.11版本的源码进行分析,Redis是基于C语言编写的,语法结构与Java不太一样,但Redis的代码足够精简,阅读起来还是比较容易的,这里我也添加了一些注释,github上有大神针对Redis源码的整个注释,大家也可以下载下来自己参考阅读一下。
Redis源码注释版:https://github.com/huangz1990/redis-3.0-annotated

主函数入口

我们先从main函数开始,在redis.c文件中,我们可以看到Redis启动的main函数,大概罗列做了以下几件事:

  • 检查是否是Sentinel模式启动;
  • 初始化服务器配置,在这里面初始化了LRU时间、RDB保存条件、命令、客户端输出缓冲区大小、慢查询日志等等;
  • 检查并加载配置文件和启动命令;
  • 初始化服务器,这里主要初始化的是Redis存储的数据结构,还进行了时间事件创建、打开TCP端口并监听端口、为TCP连接关联应答(accept)处理器(这里就是所谓的I/O多路复用器)、初始化Lua脚本系统、初始化慢查询功能、初始化后台持久化的线程(这里是主函数中唯一使用到多线程的地方);
  • 从AOF或者RDB中加载数据;
  • 检查内存配置,maxmemory配置项是否正确;
  • 运行事件处理器,以上执行完成,代表着服务器启动成功,在这个函数中是Redis工作的函数,我们下面展开讲一下;
  • 停止事件循环;
  • 返回0;
    这里我把源码贴到下面,大家可以看一下注释
int main(int argc, char **argv) {
    struct timeval tv;

    /* We need to initialize our libraries, and the server configuration. */
    // 初始化库
#ifdef INIT_SETPROCTITLE_REPLACEMENT
    spt_init(argc, argv);
#endif
    setlocale(LC_COLLATE,"");
    zmalloc_enable_thread_safeness();
    zmalloc_set_oom_handler(redisOutOfMemoryHandler);
    srand(time(NULL)^getpid());
    gettimeofday(&tv,NULL);
    dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid());

    // 检查服务器是否以 Sentinel 模式启动
    server.sentinel_mode = checkForSentinelMode(argc,argv);

    // 初始化服务器
    initServerConfig();

    /* We need to init sentinel right now as parsing the configuration file
     * in sentinel mode will have the effect of populating the sentinel
     * data structures with master nodes to monitor. */
    // 如果服务器以 Sentinel 模式启动,那么进行 Sentinel 功能相关的初始化
    // 并为要监视的主服务器创建一些相应的数据结构
    if (server.sentinel_mode) {
        initSentinelConfig();
        initSentinel();
    }

    // 检查用户是否指定了配置文件,或者配置选项
    if (argc >= 2) {
        int j = 1; /* First option to parse in argv[] */
        sds options = sdsempty();
        char *configfile = NULL;

        /* Handle special options --help and --version */
        // 处理特殊选项 -h 、-v 和 --test-memory
        if (strcmp(argv[1], "-v") == 0 ||
            strcmp(argv[1], "--version") == 0) version();
        if (strcmp(argv[1], "--help") == 0 ||
            strcmp(argv[1], "-h") == 0) usage();
        if (strcmp(argv[1], "--test-memory") == 0) {
            if (argc == 3) {
                memtest(atoi(argv[2]),50);
                exit(0);
            } else {
                fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n");
                fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n");
                exit(1);
            }
        }

        /* First argument is the config file name? */
        // 如果第一个参数(argv[1])不是以 "--" 开头
        // 那么它应该是一个配置文件
        if (argv[j][0] != '-' || argv[j][1] != '-')
            configfile = argv[j++];

        /* All the other options are parsed and conceptually appended to the
         * configuration file. For instance --port 6380 will generate the
         * string "port 6380\n" to be parsed after the actual file name
         * is parsed, if any. */
        // 对用户给定的其余选项进行分析,并将分析所得的字符串追加稍后载入的配置文件的内容之后
        // 比如 --port 6380 会被分析为 "port 6380\n"
        while(j != argc) {
            if (argv[j][0] == '-' && argv[j][1] == '-') {
                /* Option name */
                if (sdslen(options)) options = sdscat(options,"\n");
                options = sdscat(options,argv[j]+2);
                options = sdscat(options," ");
            } else {
                /* Option argument */
                options = sdscatrepr(options,argv[j],strlen(argv[j]));
                options = sdscat(options," ");
            }
            j++;
        }
        if (configfile) server.configfile = getAbsolutePath(configfile);
        // 重置保存条件
        resetServerSaveParams();

        // 载入配置文件, options 是前面分析出的给定选项
        loadServerConfig(configfile,options);
        sdsfree(options);

        // 获取配置文件的绝对路径
        if (configfile) server.configfile = getAbsolutePath(configfile);
    } else {
        redisLog(REDIS_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis");
    }

    // 将服务器设置为守护进程
    if (server.daemonize) daemonize();

    // 创建并初始化服务器数据结构
    initServer();

    // 如果服务器是守护进程,那么创建 PID 文件
    if (server.daemonize) createPidFile();

    // 为服务器进程设置名字
    redisSetProcTitle(argv[0]);

    // 打印 ASCII LOGO
    redisAsciiArt();

    // 如果服务器不是运行在 SENTINEL 模式,那么执行以下代码
    if (!server.sentinel_mode) {
        /* Things not needed when running in Sentinel mode. */
        // 打印问候语
        redisLog(REDIS_WARNING,"Server started, Redis version " REDIS_VERSION);
    #ifdef __linux__
        // 打印内存警告
        linuxOvercommitMemoryWarning();
    #endif
        // 从 AOF 文件或者 RDB 文件中载入数据
        loadDataFromDisk();
        // 启动集群?
        if (server.cluster_enabled) {
            if (verifyClusterConfigWithData() == REDIS_ERR) {
                redisLog(REDIS_WARNING,
                    "You can't have keys in a DB different than DB 0 when in "
                    "Cluster mode. Exiting.");
                exit(1);
            }
        }
        // 打印 TCP 端口
        if (server.ipfd_count > 0)
            redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
        // 打印本地套接字端口
        if (server.sofd > 0)
            redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
    } else {
        sentinelIsRunning();
    }

    /* Warning the user about suspicious maxmemory setting. */
    // 检查不正常的 maxmemory 配置
    if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {
        redisLog(REDIS_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
    }

    // 运行事件处理器,一直到服务器关闭为止
    aeSetBeforeSleepProc(server.el,beforeSleep);
    aeMain(server.el);

    // 服务器关闭,停止事件循环
    aeDeleteEventLoop(server.el);

    return 0;
}

事件处理器

在Redis初始化完成后,会进入aeMain函数,这里是Redis主要的事件处理器,我们可以看下内部是如何做的;

/*
 * 事件处理器的主循环
 */
void aeMain(aeEventLoop *eventLoop) {

    eventLoop->stop = 0;

    while (!eventLoop->stop) {

        // 如果有需要在事件处理前执行的函数,那么运行它
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);

        // 开始处理事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

我们可以看到aeMain内部是一个while的死循环,真正的事件处理又交给了aeProcessEvents函数,在aeProcessEvents函数中主要处理Redis的两种事件,即时间事件和文件事件;
大概罗列aeProcessEvents函数做以下几件事:

  • 获取即将到达的时间事件和已经达到的时间事件;
  • 处理文件事件,在Redis中所有的socket都被视为文件事件,在这里会获取所有的文件事件,循环处理读事件和写事件;
  • 开始处理到达的时间事件;
  • 返回已处理的文件/时间事件数量;
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        // 获取最近的时间事件
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            // 如果时间事件存在的话
            // 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间
            long now_sec, now_ms;

            /* Calculate the time missing for the nearest
             * timer to fire. */
            // 计算距今最近的时间事件还要多久才能达到
            // 并将该时间距保存在 tv 结构中
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            tvp->tv_sec = shortest->when_sec - now_sec;
            if (shortest->when_ms < now_ms) {
                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                tvp->tv_sec --;
            } else {
                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
            }

            // 时间差小于 0 ,说明事件已经可以执行了,将秒和毫秒设为 0 (不阻塞)
            if (tvp->tv_sec < 0) tvp->tv_sec = 0;
            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
        } else {
            
            // 执行到这一步,说明没有时间事件
            // 那么根据 AE_DONT_WAIT 是否设置来决定是否阻塞,以及阻塞的时间长度

            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                // 设置文件事件不阻塞
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                // 文件事件可以阻塞直到有事件到达为止
                tvp = NULL; /* wait forever */
            }
        }

        // 处理文件事件,阻塞时间由 tvp 决定
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            // 从已就绪数组中获取事件
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

           /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            // 读事件
            if (fe->mask & mask & AE_READABLE) {
                // rfired 确保读/写事件只能执行其中一个
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            // 写事件
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }

            processed++;
        }
    }

    /* Check time events */
    // 执行时间事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

总结

总结一下,从源码的角度分析,Redis的单线程模型,就是在Redis主函数启动后,没有再创建过新的线程;
Redis在处理文件事件是也就是读取socket缓冲区中的数据,并进行解析响应的;在这里Redis对于socket监听就是所谓I/O多路复用器,而Redis处理事件一直是基于一个while循环来处理的;

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 226,130评论 6 524
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 97,210评论 3 410
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 173,639评论 0 370
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 61,755评论 1 304
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 70,694评论 6 404
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 54,160评论 1 317
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 42,359评论 3 433
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 41,436评论 0 282
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 47,990评论 1 328
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 39,981评论 3 351
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 42,089评论 1 359
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 37,669评论 5 353
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 43,363评论 3 342
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 33,762评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 34,950评论 1 278
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 50,689评论 3 384
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 47,124评论 2 368

推荐阅读更多精彩内容