libuv-v1.11.0 - 源码剖析 - 01 - uv_loop_t
> uv_loop_t
struct {
void *data;
unsigned int active_handles; /*
活跃的handle计数。
uv__handle_start()时若handle有UV__HANDLE_REF标志,\
此处++。
uv__handle_stop()时若handle有UV__HANDLE_REF标志,\
此处--。
*/
void* handle_queue[2]; /*
存放全部handle的队列。
uv__handle_init()向其尾追加handle。
uv_loop内部执行uv__finish_close(handle)时,handle出队。
*/
void* active_reqs[2]; /*
存放全部req的队列。
uv__req_register()向其尾部追加req。
uv__req_unregister()将req从本队列移除。
*/
unsigned int stop_flag; /* 退出uv_run的标志,初始0,uv_stop()设其为1 */
// UV_LOOP_PRIVATE_FIELDS (uv-unix.h)
unsigned long flags;
int backend_fd; /* uv底层io时使用的io模型。unix上为kqueue,linux为epoll,... */
void *pending_queue[2]; /* 队列中存放等待执行回调的uv__io_t */
void *watcher_queue[2]; /* 队列中存放被监听的uv__io_t */
uv__io_t ** watchers; /* 存放被监听的uv__io_t指针的数组,以对应的fd作为数组索引 */
unsigned int nwatchers; /* nwatchers+2 是上面watchers数组的长度 */
unsigned int nfds; /* number of fds */
void* wq[2]; /*
uv_work在threadpool中执行完uv__work的wrok()函数后,\
将uv__work转移到此队列中,\
并用loop->wq_async通知uv_loop执行其done()函数。
*/
uv_mutex_t wq_mutex;
uv_async_t wq_async; /*
uv_work在threadpool中执行完uv__work的work()函数后,
将uv__work转移到loop->wq队列中,并用wq_async通知uv_loop。\
wq_async的回调函数是uv__work_done(),\
uv__work_done()遍历loop->wq队列并执行各元素的done()函数。
*/
uv_rwlock_t cloexec_lock;
uv_handle_t* closing_handles; /*
等待执行close的handle。\
uv_close()时将handle添加到队首。\
uv_loop的loop iteration的最后会\
执行队列中handle的close操作,并清空队列。
*/
void* process_handles[2];
void* prepare_handles[2]; /* 队列中存放活跃的uv_prepare_t */
void* check_handles[2]; /* 队列中存放活跃的uv_check_t */
void* idle_handles[2]; /*
队列中存放活跃的uv_idle_t
uv_idle_start()时入队首。
uv_idle_stop()时出队。
uv_loop每次loop iteration会遍历idle_handles中\
的idle并执行其idle_cb。
*/
void* async_handles[2]; /*
uv_async_t的队列。
uv_async_init()时追加uv_async_t到尾部。
uv_close()时从队列移除uv_async_t。
uv_loop的循环中,通过io管道来监听是否有异步事件,\
如果有异步事件的话,uv_loop会遍历本队列中的async,\
执行需要回调的async。
*/
struct uv__async async_watcher; /*
在第一次调用uv_async_init()时初始化。
uv_loop从一个io管道监听是否有异步事件,\
当有异步事件时,会调用async_watcher.cb,\
async_wacther.cb在初始时被置为uv__async_event()方法,\
该方法内会遍历上面的async_handles,从而执行异步回调。
*/
struct {
void* min;
unsigned int nelts;
} timer_heap; /*
定时器时间堆。
uv_timer_start()向时间堆插入节点。
uv_timer_stop()从时间堆移除节点。
uv_loop内部执行uv__run_timers()时从时间堆中\
取出已超时的uv_timer_t,执行其回调。
*/
uint64_t timer_counter; /*
定时器start_id计数器。
每次调用uv_timer_start()时,timer_counter++作为timer的start_id。
*/
uint64_t time; /* 存储当前时间,用此时间判断定时器是否超时。 */
int signal_pipefd[2];
uv__io_t signal_io_watcher;
uv_signal_t child_watcher;
int emfile_fd;
// UV_PLATFORM_LOOP_FIELDS (uv-linux.h)
uv__io_t inotify_read_watcher;
void* inotify_watchers;
int inotify_fd;
};
> uv_run()
int uv_run(uv_loop_t* loop, uv_run_mode mode) {
int timeout;
int r;
int ran_pending;
r = uv__loop_alive(loop);
if (!r)
uv__update_time(loop);
while (r != 0 && loop->stop_flag == 0) {
uv__update_time(loop);
uv__run_timers(loop);
ran_pending = uv__run_pending(loop);
uv__run_idle(loop);
uv__run_prepare(loop);
timeout = 0;
if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
timeout = uv_backend_timeout(loop);
uv__io_poll(loop, timeout);
uv__run_check(loop);
uv__run_closing_handles(loop);
if (mode == UV_RUN_ONCE) {
/* UV_RUN_ONCE implies forward progress: at least one callback must have
* been invoked when it returns. uv__io_poll() can return without doing
* I/O (meaning: no callbacks) when its timeout expires - which means we
* have pending timers that satisfy the forward progress constraint.
*
* UV_RUN_NOWAIT makes no guarantees about progress so it's omitted from
* the check.
*/
uv__update_time(loop);
uv__run_timers(loop);
}
r = uv__loop_alive(loop);
if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
break;
}
/* The if statement lets gcc compile it to a conditional store. Avoids
* dirtying a cache line.
*/
if (loop->stop_flag != 0)
loop->stop_flag = 0;
return r;
}
> uv__loop_alive()
static int uv__loop_alive(const uv_loop_t* loop) {
return uv__has_active_handles(loop) || // loop->active_handles > 0
uv__has_active_reqs(loop) || // QUEUE_EMPTY(&(loop)->active_reqs) == 0
loop->closing_handles != NULL;
}
> uv_is_active()
int uv_is_active(const uv_handle_t* handle) {
return uv__is_active(handle); // (((h)->flags & UV__HANDLE_ACTIVE) != 0)
}
> uv_is_closing
int uv_is_closing(const uv_handle_t* handle) {
return uv__is_closing(handle); // (((h)->flags & (UV_CLOSING | UV_CLOSED)) != 0)
}
libuv-v1.11.0 - 源码剖析 - 02 - uv_timer_t
> uv_timer_t
struct {
// UV_HANDLE_FIELDS (uv.h)
void *data;
uv_loop_t *loop; /* 所属的uv_loop */
uv_handle_type type; /* handle的类型,对于uv_timer_t初始化为UV_TIMER */
uv_close_cb close_cb; /*
uv_close()时设置的回调。
uv_loop内部执行uv__finish_close(handle)时执行回调。
*/
void* handle_queue[2]; /*
用于插入loop->handle_queue的队列节点。
uv_timer_init()时被追加到loop->handle_queue尾部。
uv_loop内部执行uv__finish_close(handle)时,\
handle从loop->handle_queue出队。
*/
union {
int fd;
void *reserved[4];
} u;
// UV_HANDLE_PRIVATE_FIELDS (uv-unix.h)
uv_handle_t* next_closing; /*
下一个待close的handle。
uv_handle_init()后为NULL。
uv_close()后:
指向loop->closing_handles, loop->closing_handles指向此。
*/
unsigned int flags; /*
可取值:
UV__HANDLE_CLOSING (UV_CLOSING),
UV__HANDLE_REF,
UV__HANDLE_ACTIVE,
UV__HANDLE_INTERNAL,
UV_CLOSED。
uv_handle_init()后为UV__HANDLE_REF。
uv__handle_start()时 |=UV__HANDLE_ACTIVE。
uv__handle_stop()时 &=~UV__HANDLE_ACTIVE。
uv_close()时 |=UV_CLOSING。
uv_loop内部执行uv__finish_close(handle)时,\
|= UV_CLOSED, &=~UV__HANDLE_REF。
*/
// UV_TIMER_PRIVATE_FIELDS (uv-unix.h)
uv_timer_cb timer_cb; /*
定时器回调。
uv_timer_init()后为NULL。
uv_timer_start()时设置。
*/
void* heap_node[3]; /*
时间堆节点。
在uv_timer_start()时会将此节点插入loop->timer_heap。
在uv_timer_stop()时会将此节点从loop->timer_heap中移除。
uv_loop内部执行uv__run_timers()时从时间堆中\
取出已超时的uv_timer_t,执行其回调。
*/
uint64_t timeout; /*
超时时间。
uv_timer_init()时不改变。
uv_timer_start()时设置值。
uv_loop内部执行uv__run_timers()时,\
通过比较timeout和loop->time来判断定时器超时。
*/
uint64_t repeat; /*
定时器repeat时间间隔。0表示不repeat。
uv_timer_init()后为0。
uv_timre_start()时设置。
uv_loop内部的uv__run_timers()每当触发一次定时器回调后,\
通过判断repeat!=0来决定是否以repeat作为超时再次启动定时器。
*/
uint64_t start_id; /*
start_id is the second index to be compared in \
uv__timer_cmp()
*/
};
> uv_check_t、uv_prepare_t 的实现与uv_idle_t相同。
libuv-v1.11.0 - 源码剖析 - 03 - uv_idle_t
> uv_idle_t
struct {
// UV_HANDLE_FIELDS (uv.h)
void *data;
uv_loop_t* loop;
uv_handle_type type; /* handle的类型,对于uv_idle_t初始化为UV_IDLE */
uv_close_cb close_cb; /*
uv_close()时设置的回调。
uv_loop内部执行uv__finish_close(handle)时执行回调。
*/
void *handle_queue[2]; /*
用于插入loop->handle_queue的队列节点。
uv_idle_init()时被追加到loop->handle_queue尾部。
uv_loop内部执行uv__finish_close(handle)时,\
handle从loop->handle_queue出队。
*/
union {
int fd;
void *reserved[4];
} u;
// UV_HANDLE_PRIVATE_FIELDS (uv-unix.h)
uv_handle_t* next_closing; /*
下一个待close的handle。
uv_handle_init()后为NULL。
uv_close()后:
指向loop->closing_handles, loop->closing_handles指向此。
*/
unsigned int flags; /*
可取值:
UV__HANDLE_CLOSING (UV_CLOSING),
UV__HANDLE_REF,
UV__HANDLE_ACTIVE,
UV__HANDLE_INTERNAL,
UV_CLOSED。
uv_idle_init()后为UV__HANDLE_REF。
uv__handle_start()时 |=UV__HANDLE_ACTIVE。
uv__handle_stop()时 &=~UV__HANDLE_ACTIVE。
uv_close()时 |=UV_CLOSING。
uv_loop内部执行uv__finish_close(handle)时,\
|= UV_CLOSED, &=~UV__HANDLE_REF。
*/
// UV_IDLE_PRIVATE_FIELDS (uv-unix.h)
uv_idle_cb idle_cb; /*
idle的回调
uv_idle_init()时置空
uv_idle_start()时置值。
*/
void *queue[2]; /*
uv_idle_start()时将此queue挂接到loop->idle_handles队首
uv_idle_stop()时出队。
uv_loop每次loop iteration会遍历idle_handles中\
的idle并执行其idle_cb。
*/
};
libuv-v1.11.0 - 源码剖析 - 04 - uv_async_t
> uv__io_t
struct uv__io_s {
uv__io_cb cb; /* io事件回调, event类型:POLLIN | POLLOUT | UV__POLLRDHUP */
void* pending_queue[2];
void* watcher_queue[2];
unsigned int pevents; /* Pending event mask i.e. mask at next tick. */
unsigned int events; /* Current event mask. */
int fd; /* 监听的fd */
UV_IO_PRIVATE_PLATFORM_FIELDS
};
> struct uv__async
struct uv__async {
uv__async_cb cb; /* 当从io_watcher.fd读取到数据后,调用该回调 */
uv__io_t io_watcher; /*
监听io_watcher.fd的读取事件,读取到数据后,触发上面的cb回调。
uv__async_init()设置io_watcher.fd = -1。
uv__async_start()设置io_watcher.fd = 管道读端fd。
*/
int wfd; /*
管道写端fd。uv_async_send()会向其中写入一个字节。
uv__async_init()设置wfd = -1。
uv__async_start()设置wfd = 管道写端fd。
*/
};
> uv_async_t
struct {
// UV_HANDLE_FIELDS (uv.h)
void *data;
uv_loop_t *loop;
uv_handle_type type;
uv_close_cb close_cb;
void* handle_queue[2]; /*
队列节点。该节点将在uv__handle_init()时挂接\
到loop->handle_queue尾
*/
union {
int fd;
void *reserved[4];
} u;
// UV_HANDLE_PRIVATE_FIELDS (uv-unix.h)
uv_handle_t* next_closing;
unsigned int flags;
// UV_ASYNC_PRIVATE_FIELDS (uv-unix.h)
uv_async_cb async_cb; /*
异步事件回调。
uv_async_init()设置该回调。
*/
void* queue[2]; /*
队列节点。该节点将在uv_async_init()时挂接到\
loop->async_handles尾。
*/
int pending; /*
等待回调异步事件的标志。
初始为0。
uv_async_send()设其为1。
事件回调后恢复为0。
*/
};
libuv-v1.11.0 - 源码剖析 - 05 - uv_req_t
> uv_getaddrinfo_t
struct {
// UV_REQ_FIELDS (uv.h)
void *data;
uv_req_type type; /* 对于uv_getaddrinfo_t,此处初始化为UV_GETADDRINFO。 */
void* active_queue[2]; /*
队列节点。用于挂接到loop->active_reqs队列。
uv__req_init()调用的uv__req_register()中将此节点添加到loop->active_reqs尾部。
uv__getaddrinfo_done()调用的uv__req_unregister()中将req从队列移除。
*/
void* reserved[4];
//UV_REQ_PRIVATE_FIELDS
// UV_GETADDRINFO_PRIVATE_FIELDS (uv-unix.h)
struct uv__work work_req; /*
work_req->wq字段可以挂接到全局work队列中,\
threadpool中的线程从该队列中互斥取work_req,\
并执行work_req->work()。\
执行完work()函数后,将uv__work转移到loop->wq队列中,\
并向loop->wq_async发送异步事件,\
loop->async的回调函数uv__work_done(),\
uv__work_done()遍历loop->wq队列并执行各元素的done()函数。\
done()函数中会执行uv_getaddrinfo_cb。
*/
uv_getaddrinfo_cb cb;
struct addrinfo* hints;
char* hostname;
char* service;
struct addrinfo* addrinfo;
int retcode;
};
> uv_write_t
struct {
// UV_REQ_FIELDS (uv.h)
void *data;
uv_req_type type;
void* active_queue[2]; /*
uv_write()时挂接到loop->active_ques队列。
write完成之后,在uv__write_callbacks()中\
把req从loop->active_ques移除。
uv__stream_destroy()函数内部会把未完成的uv_write强制结束,\
并执行uv__write_callbacks()从loop->active_ques中移除。
*/
void* reserved[4];
// UV_REQ_PRIVATE_FIELDS
uv_write_cb cb;
uv_stream_t* send_handle;
uv_stream_t* handle;
// UV_WRITE_PRIVATE_FIELDS (uv-unix.h)
void* queue[2]; /*
uv_write()时挂接到stream->write_queue队列。 \
当write完成后转移到stream->write_completed_queue队列,\
然后调用uv__write_callbacks(),uv__write_callbacks()取出\
stream->write_completed_queue队列中的req并执行回调。
*/
unsigned int write_index;
uv_buf_t* bufs;
unsigned int nbufs;
int error;
uv_buf_t bufsml[4];
};