六、nginx的事件监听(参考《深入剖析Nginx》)(上)

 1、在第四章(nginx启动过程中的进程创建)中,提到子进程最后在for循环中进行事件处理,其相关代码具体定义为:

    static void
ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
{
ngx_uint_t         i;
ngx_connection_t  *c;
ngx_process = NGX_PROCESS_WORKER;
ngx_worker_process_init(cycle, 1);

ngx_setproctitle("worker process");
//...

for ( ;; ) {
  // ...
    ngx_process_events_and_timers(cycle);
//  ...
  }
}

 在for循环中,不断执行ngx_process_events_and_timers来处理事件。在具体查看该函数定义之前,先了解一下其参数cycle的定义。

    struct ngx_cycle_s {
void                  ****conf_ctx;
ngx_pool_t               *pool;

ngx_log_t                *log;
ngx_log_t                 new_log;

ngx_connection_t        **files;
ngx_connection_t         *free_connections;
ngx_uint_t                free_connection_n;

ngx_queue_t               reusable_connections_queue;

ngx_array_t               listening;
ngx_array_t               pathes;
ngx_list_t                open_files;
ngx_list_t                shared_memory;

ngx_uint_t                connection_n;
ngx_uint_t                files_n;

ngx_connection_t         *connections;
ngx_event_t              *read_events;
ngx_event_t              *write_events;

ngx_cycle_t              *old_cycle;

ngx_str_t                 conf_file;
ngx_str_t                 conf_param;
ngx_str_t                 conf_prefix;
ngx_str_t                 prefix;
ngx_str_t                 lock_file;
ngx_str_t                 hostname;
};

 关于cycle的具体内涵,请查看nginx 源码学习笔记(十四)—— 全局变量ngx_cycle,在此不多做赘述。
  2、关于ngx_process_events_and_timers
 该函数位于ngx_event.c,其具体定义为:

      void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
    ngx_uint_t  flags;
    ngx_msec_t  timer, delta;

    if (ngx_timer_resolution) {
    //...
}
    //多进程竞争端口,所以加锁
    if (ngx_use_accept_mutex) {
   //...
}
     //delta用于计算时间
    delta = ngx_current_msec;
    //进行事件处理
    (void) ngx_process_events(cycle, timer, flags);
    
    delta = ngx_current_msec - delta;

      //处理accept事件
     if (ngx_posted_accept_events) {
        ngx_event_process_posted(cycle, &ngx_posted_accept_events);
}
    //对ngx_use_accept_mutex解锁
    if (ngx_accept_mutex_held) {
        ngx_shmtx_unlock(&ngx_accept_mutex);
 }

    if (delta) {
        ngx_event_expire_timers();
}

    if (ngx_posted_events) {
      //...
}

  3、先关注一下ngx_process_events_and_timers的逻辑结构(简单来看),在多进程下,如下如所示:


image.png

  4、下面关注事件处理的函数ngx_process_events,发现它本身只是一个宏定义:

#define ngx_process_events   ngx_event_actions.process_events

 而 ngx_event_actions是ngx_event.c中的一个结构体:

    ngx_event_actions_t   ngx_event_actions;

 显然,关键在于结构体ngx_event_actions_t 的定义和初始化。先看看定义吧:

    typedef struct {
    ngx_int_t  (*add)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
    ngx_int_t  (*del)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);

    ngx_int_t  (*enable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
    ngx_int_t  (*disable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);

    ngx_int_t  (*add_conn)(ngx_connection_t *c);
    ngx_int_t  (*del_conn)(ngx_connection_t *c, ngx_uint_t flags);

    ngx_int_t  (*process_changes)(ngx_cycle_t *cycle, ngx_uint_t nowait);
    ngx_int_t  (*process_events)(ngx_cycle_t *cycle, ngx_msec_t timer,
                   ngx_uint_t flags);

    ngx_int_t  (*init)(ngx_cycle_t *cycle, ngx_msec_t timer);
    void       (*done)(ngx_cycle_t *cycle);
} ngx_event_actions_t;

 这个结构体中定义了大量的函数指针。上文的ngx_process_events_and_timers调用的应该是其中的

    ngx_int_t  (*process_events)(ngx_cycle_t *cycle, ngx_msec_t timer,
               ngx_uint_t flags);

 那么问题来了,这些函数指针指向何处呢?

 5、nginx事件模块的初始化(略)
 在ngx_process_cycle.c中,有如下代码用于初始化各个模块:

    for (i = 0; ngx_modules[i]; i++) {
    if (ngx_modules[i]->init_process) {
        if (ngx_modules[i]->init_process(cycle) == NGX_ERROR) {
            /* fatal */
            exit(2);
        }
    }
}

 而ngx_modules[i]->init_process也是函数指针,关于全局变量ngx_modules的初始化,我还未完全理解。但可以确定的是,若在conf文件中配置使用epoll作为IO多路复用的机制,则这里的ngx_event_actions_t结构体中的process_events函数将会绑定为epoll模块中的ngx_epoll_process_events函数。

 6、总结一下,至此,已经知道worker进程在循环中不断调用ngx_process_events_and _timers函数,若使用epoll,则该函数对应ngx_epoll_process_events。这个函数的主要作用在于把获取到的事件放进对应的事件队列,留待后续函数处理。其主要代码为:

    static ngx_int_t
    ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t    flags)
    {
        int                events;
        uint32_t           revents;
        ngx_int_t          instance, i;
        ngx_uint_t         level;
        ngx_err_t          err;
        ngx_event_t       *rev, *wev, **queue;
        ngx_connection_t  *c;


        events = epoll_wait(ep, event_list, (int) nevents, timer);

      //...

if (events == 0) {
 //...
}

ngx_mutex_lock(ngx_posted_events_mutex);

for (i = 0; i < events; i++) {
    //获取事件
    c = event_list[i].data.ptr;
    instance = (uintptr_t) c & 1;
    c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);
    rev = c->read;

    if (c->fd == -1 || rev->instance != instance) {
    //...
    }
    revents = event_list[i].events;
            //...


    if ((revents & EPOLLIN) && rev->active) {

        if ((flags & NGX_POST_THREAD_EVENTS) && !rev->accept) {
            rev->posted_ready = 1;

        } else {
            rev->ready = 1;
        }

        if (flags & NGX_POST_EVENTS) {
            queue = (ngx_event_t **) (rev->accept ?
                           &ngx_posted_accept_events : &ngx_posted_events);
            ngx_locked_post_event(rev, queue);

        } else {
            rev->handler(rev);
        }
    }

    wev = c->write;

    if ((revents & EPOLLOUT) && wev->active) {

        if (c->fd == -1 || wev->instance != instance) {
            //...
            continue;
        }

        if (flags & NGX_POST_THREAD_EVENTS) {
            wev->posted_ready = 1;

        } else {
            wev->ready = 1;
        }

        if (flags & NGX_POST_EVENTS) {
            ngx_locked_post_event(wev, &ngx_posted_events);

        } else {
            wev->handler(wev);
        }
    }
}

ngx_mutex_unlock(ngx_posted_events_mutex);
return NGX_OK;
}

 重点在于:

    if (flags & NGX_POST_EVENTS) {
        queue = (ngx_event_t **) (rev->accept ?
                       &ngx_posted_accept_events : &ngx_posted_events);
        ngx_locked_post_event(rev, queue);

    } else {
        rev->handler(rev);
    }

 若事件需要延后处理,则会将其放进队列中,留待ngx_event_process_posted等函数处理。若不是,则直接调用handler进行处理。

 7、限于时间,本人还未读懂事件处理的主要逻辑,所以本篇写得不甚清楚#=_=。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。