1. NXLog 简介
nxlog 是用 C 语言写的一个开源日志收集处理软件,它是一个模块化、多线程、高性能的日志管理解决方案,支持多平台。今天我主要分析一下 nxlog 的启动流程,基于的 code 版本是 nxlog-ce-2.8.1248
。
2. NXLog 启动流程图
上图是 nxlog 启动的一个大致流程图,大家可以先看一眼,对整个流程有个大致认识,具体的解析下面奉上。
3. NXLog 启动详解
下面根据启动流程的各个步骤分别进行分析。
3.1 NXLog Init
nxlog->ctx = nx_ctx_new();
nx_ctx_register_builtins(nxlog->ctx);
CHECKERR(apr_thread_mutex_create(&(nxlog->mutex), APR_THREAD_MUTEX_UNNESTED, nxlog->pool));
CHECKERR(apr_thread_cond_create(&(nxlog->event_cond), nxlog->pool));
CHECKERR(apr_thread_cond_create(&(nxlog->worker_cond), nxlog->pool));
nxlog_init 主要干了三件事情。
- 通过 nx_ctx_new 接口创建一个configuration context。ctx 很重要,主要用来存储 nxlog 的配置,module,jobgroups 等信息。
- 通过 nx_ctx_register_builtins 接口绑定 IN, OUT 以及 CORE 模块的 callback。下面以 IN 模块为例来进行说明。
static void nx_ctx_register_builtin_inputfuncs()
{
nx_module_input_func_register(NULL, "linebased",
&nx_module_input_func_linereader, NULL, NULL);
nx_module_input_func_register(NULL, "dgram",
&nx_module_input_func_dgramreader, NULL, NULL);
nx_module_input_func_register(NULL, "binary",
&nx_module_input_func_binaryreader, NULL, NULL);
}
Input module 提供三种 callback。根据数据源的不同分为 linebased,dgram,binary
三种方式。linebased 代表log一行一行切分的,适用于读取日志文件。dgram 代表log是一个包一个包来切分的,适用于接收 UDP syslog 消息。binary 代表数据源为二进制流。nxlog 的配置文件里每个 module 下面有个 InputType 选项,我们可以把它分别配置成 LineBased,Dgram,Binary 来实现相应的功能。
- 通过
apr_thread_mutex_create
,apr_thread_cond_create
创建互斥锁
(nxlog->mutex) 和条件变量
(nxlog->event_cond 和 nxlog->worker_cond)。这里我们看到 nxlog 有使用APR(Apache Portable Runtime)
的接口,在 nxlog 里很多涉及到平台相关的代码都是调用 APR 的接口来实现的,以此来实现跨平台。
互斥锁,是一种信号量,常用来防止两个进程或线程在同一时刻访问共享资源。条件变量(Condtion Variable)是在多线程程序中用来实现“等待” -> “唤醒”逻辑的常用方法。
NXLog 使用的是生产者消费者模式
,该模式需要有一个缓冲区处于生产者和消费者之间,生产者把数据放入缓冲区,而消费者从缓冲区取出数据。这个缓冲区在 NXLog 里叫做 jobqueue,jobqueue属于共享资源,我们需要使用 nxlog->mutex 对多个线程进行同步,防止多个线程同时访问 jobqueue 而导致数据出错。
既然是生产者消费者模式,那就有可能出现生产者快于消费者或者消费者快于生产者的情况。当生产者快于消费者时,我们首先想到的是 jobqueque 会不会满,事实上 jobqueue 是用一个双向链表来实现的,它只是负责把各种 event 链在一起,本身并不维护一块内存空间,真正消耗内存的是 event 本身,所以理论上只要 event 能创建就没有问题。但是如果程序经常出现这种情况,那就代表处理能力不足,影响运行效率。为了不出现这种问题,消费者这一块 nxlog 是用线程池来实现的(在后面 Create threads 我们再细说),这也就是它号称多线程,高性能的原因。
下面再来看看当消费者快于生产者的情况,出现这种情况后,我们就发现 jobqueue 是空的,这时我们不希望消费者忙等而消耗资源,而是希望他们睡眠,当有新的 event 产生时再唤醒消费者。这就需要上面我们说到的条件变量,消费者调用apr_thread_cond_wait
睡眠,当生产者有消息时调用apr_thread_cond_signal
来唤醒消费者。
3.2 NXLog parse configuration
NXLog 的配置分为两部分,一部分通过命令行参数的方式带进来,不过这部分配置很少,大多数配置还是通过配置文件的方式下发。
- Command line
命令行就没有什么好说的了, 主要是用来指定运行方式以及配置文件路径等。
static const apr_getopt_option_t options[] = {
{ "help", 'h', 0, "print help" },
{ "foreground", 'f', 0, "run in foreground" },
{ "stop", 's', 0, "stop a running instance" },
{ "reload", 'r', 0, "reload configuration of a running instance" },
{ "conf", 'c', 1, "configuration file" },
{ "verify", 'v', 0, "verify configuration file syntax" },
{ NULL, 0, 1, NULL },
};
- Config File
配置文件会被解析成一个config tree,nxlog 通过递归调用 nx_cfg_parse 接口对配置文件进行逐行(如果在行尾有反斜杠 '', 会把多行并作一行)分析,最后返回一个 cfgtree,这个 tree 中的每个节点都保存了一行配置中的指令和参数,同时它还会保存父节点,子节点以及兄弟节点的地址。
这样就把配置文件保存在数据结构 cfgtree 中,最后在每个 module 启动之前会调用这个 module 的 config 接口,这个接口遍历该模块缓存在 cfgtree 中的配置,然后使能这些配置,这样 config file 的使命就完成了。关于 config file 的写法以及每个 module 的配置选项有很多,在此我就不赘述,不清楚的朋友可以参考 doc/reference-manual 目录,在 config-examples 目录下有一些配置模板,在 en 目录下有参考手册nxlog-reference-manual.pdf
,这里面关于 config file 讲的很详细。
3.3 Read config cache
Config cache 里保存了 nxlog 上次采集的位置,比如文件的话它会记录文件的名字以及最后一次的 offset,这样当 nxlog 重启后它就知道上次采集到了那里,然后沿着上一次采集的位置继续采集,这样就避免了重复采集。针对日志采集,我们比较忌讳的有两点,一是丢数据,二就是重复采集,这两种情况都会导致采集上来的日志和原来的日志不一致。
Nxlog 在启动的时候调用 nx_config_cache_read 接口,把 configcache.dat
文件里保存的信息读出并保存在 config_cache 数据结构中,采集文件的模块启动时候会在 config_cache 里查找有没有某个文件的数据,如果不存在就从文件的开头或者末尾开始读取,如果存在就调用 apr_file_seek 移动文件指针到上次读取的位置,当 nxlog 要退出的时候将更新 config_cache 到文件最新的读取位置,然后调用 nx_config_cache_write 将 config_cache 回写到 configcache.dat 文件中。
3.4 Add & config modules
Nxlog 会首先遍历 cfgtree, 针对所有的 module(共有4种, input, output, processor, extension
) 分别调用 nx_module_add 接口。该接口会调用 nx_module_new 创建一个 module,然后再调用 nx_module_load_dso 绑定该 module 的方法,等一些必要的初始化都完成后会将该 module 放到 ctx->modules 链表中统一管理。
if ( strcasecmp(curr->directive, "input") == 0 )
{
if ( curr->first_child == NULL )
{
nx_conf_error(curr, "empty 'Input' block");
}
nx_module_add(ctx, curr->first_child, curr->args, NX_MODULE_TYPE_INPUT);
}
else if ( strcasecmp(curr->directive, "processor") == 0 )
{
if ( curr->first_child == NULL )
{
nx_conf_error(curr, "empty 'Processor' block");
}
nx_module_add(ctx, curr->first_child, curr->args, NX_MODULE_TYPE_PROCESSOR);
}
else if ( strcasecmp(curr->directive, "output") == 0 )
{
if ( curr->first_child == NULL )
{
nx_conf_error(curr, "empty 'Output' block");
}
nx_module_add(ctx, curr->first_child, curr->args, NX_MODULE_TYPE_OUTPUT);
}
else if ( strcasecmp(curr->directive, "extension") == 0 )
{
if ( curr->first_child == NULL )
{
nx_conf_error(curr, "empty 'Extension' block");
}
nx_module_add(ctx, curr->first_child, curr->args, NX_MODULE_TYPE_EXTENSION);
}
接着 NXLog 会遍历 ctx->modules 链表,分别调用每个 module 的 config 方法对该 module 进行配置,然后会解析该 module 的 “Exec” 以及 “Schedule” 配置。Nxlog 确实比较强大,不仅能采集各种类型的日志,而且还能通过 “Exec” 执行一些任务,比如日志过滤等,�还能通过 “Schedule” 做一些任务调度,比如定期进行日志归档等。
ASSERT(module->status == NX_MODULE_STATUS_UNINITIALIZED);
if ( module->decl->config != NULL )
{
module->decl->config(module);
}
else
{
nx_module_empty_config_check(module);
}
module->exec = nx_module_parse_exec_block(module, module->pool, module->directives);
nx_module_parse_schedule_blocks(module);
3.5 Init modules
NXLog 会遍历 ctx->modules 链表,得到每个 module,然后调用 nx_module_init 接口对该 module 进行初始化。nx_module_init 也没干什么事情,只是调用该 module 的 init 方法。
for ( module = NX_DLIST_FIRST(ctx->modules);
module != NULL;
module = NX_DLIST_NEXT(module, link) )
{
try
{
nx_module_init(module);
if ( module->type == NX_MODULE_TYPE_INPUT )
{
num_input++;
}
}
3.6 Init routes and jobs
Route 顾名思义就是路由,它定义数据从哪个 INPUT 模块采集,经过哪个 PROCESSOR(不是必须的) 模块处理,送到哪个 OUTPUT 模块。Route 包含两个配置,一个是 “Path”
,它定义了数据的流动方向。第二个是 “Priority”
,它定义了该 route 的优先级,路由会将自己的优先级赋值给 path 中的各个 module。
while ( curr != NULL )
{
if ( strcasecmp(curr->directive, "route") == 0 )
{
if ( nx_add_route(ctx, curr, curr->args) == TRUE )
{
num_routes++;
}
}
curr = curr->next;
}
Nxlog 维护一个 jobgroups
链表,该链表按照优先级顺序存放着许多 jobgroup,相同优先级的 module 会把他们的 job 挂到同一个 jobgroup->jobs 上面。worker_thread 会优先从 priority 高的 jobgroup 里取 job,最终演变下来就是 priority 高的 module 的 event 会优先得到处理。
ASSERT(ctx != NULL);
ctx->jobgroups = apr_palloc(ctx->pool, sizeof(nx_jobgroups_t));
NX_DLIST_INIT(ctx->jobgroups, nx_jobgroup_t, link);
for ( module = NX_DLIST_FIRST(ctx->modules);
module != NULL;
module = NX_DLIST_NEXT(module, link) )
{
jobgroup = nx_ctx_get_jobgroup(ctx, module->priority);
job = apr_pcalloc(ctx->pool, sizeof(nx_job_t));
NX_DLIST_INSERT_TAIL(&(jobgroup->jobs), job, link);
module->job = job;
}
3.7 Create threads
NXLog 会创建一个 event_thread 和多个 worker_thread。究竟会创建几个 worker_thread 由 num_worker_thread 变量来决定,下面来说说这个 num_worker_thread。
如果配置文件里配置了 "Threads"
,num_worker_thread
将会被设置成该值。如果没有配置,nxlog 会根据配置文件里配置的 module 的个数以及所有 module 的 pollset 个数计算得到 num_worker_thread,具体算法可以参见 nxlog_create_threads 函数,在此我就不赘述。当配置文件里配了很多 module 的时候,这个数值有可能会大于 CPU core 的数量,这时可能会有人认为这是没用的,因为这些 threads 不可能同时得到执行,而且会增加系统的工作量(内存,调度的开销)。在我看来这要分情况,如果这些 worker_thread 从事的是 CPU 密集型
的工作,我觉得这种观点是正确的。但是如果这些 worker_thread 从事的是 IO 密集型
的工作,这种观点就值得推敲了,IO 密集型的工作一个显著的特点是 Thread 在 IO 不 Ready 的情况下会睡眠,这样即使你起的 thread 数量超过了 CPU core 的数量,但由于很多 thread 都处于睡眠状态,真正执行的 thread 并不多。针对 IO 密集型的工作,增加 thread 数量是能显著提升性能的,但也不是越多越好,当 thread 数量太多的时候反而会走向另一个极端。
event_thread
主要是处理延时的 event,它会从 ctx->events 中依次取出 event, 当该 event 的 time 已经超时了 event_thread 会把该 event 交给 worker_thread 来处理。如果没有超时会得到离当前时间最近的一个 event 要超时的时间,然后调用 apr_thread_cond_timedwait sleep 这么长的时间,等醒来后再去处理这个 event,当然在 sleep 的这段时间内如有新的 event 产生会调用 apr_thread_cond_signal(nxlog->event_cond) 唤醒 event_thread。由于 event_thread 干的事情不多,因此只需要一个。
worker_thread
先通过 nx_ctx_next_job 接口获取一个 event,如果没有 event 可处理,worker_thread 会调用 apr_thread_cond_wait(nxlog->worker_cond, nxlog->mutex) 睡眠,等有新的 event 产生时会调用 apr_thread_cond_signal(nxlog->worker_cond) 把它唤醒。如果有 event 会调用 nx_event_process 来处理。nx_ctx_next_job 获取 event 有一定的讲究,他会优先获取 priority 高的 module 的 event,它是通过优先遍历 priority 高的 jobgroup 来实现的,前面创建 jobgroups 的时候我们就说过,nxlog 会把不同 priority 的 module 的 job 放到不同 priority 的 jobgroup 中。
nxlog->worker_threads = apr_palloc(nxlog->pool, sizeof(apr_thread_t *) * nxlog->num_worker_thread);
nxlog->worker_threads_running = apr_pcalloc(nxlog->pool, sizeof(uint32_t) * nxlog->num_worker_thread);
log_debug("spawning %d worker threads", nxlog->num_worker_thread);
for ( i = 0; i < nxlog->num_worker_thread; i++)
{
nx_thread_create(&(nxlog->worker_threads[i]), NULL, nxlog_worker_thread, NULL, nxlog->pool);
}
nx_thread_create(&(nxlog->event_thread), NULL, nxlog_event_thread, NULL, nxlog->pool);
3.8 Start modules
NXLog 会遍历 ctx->modules 链表,得到每个 module,然后调用 nx_module_start 启动该 module。nx_module_start 会发送 NX_EVENT_MODULE_START event 给 worker_thread,worker_thread 收到该 event 后会调用 nx_module_start_self,绕了一圈后真正干活的才出现,nx_module_start_self 会首先获取该 module 的状态,如果是 NX_MODULE_STATUS_STOPPED 会调用该 module 的 start 接口,然后把它的状态置成 NX_MODULE_STATUS_RUNNING。
ASSERT(nx_module_get_status(module) == NX_MODULE_STATUS_STOPPED);
if ( module->decl->start != NULL )
{
module->decl->start(module);
}
nx_module_add_scheduled_events(module);
nx_module_set_status(module, NX_MODULE_STATUS_RUNNING);
3.9 Main loop
nxlog_mainloop 没干什么事情,一直调用 apr_sleep(NX_POLL_TIMEOUT),把 CPU 让给 event_thread 和 worker_threads。
当 nxlog 收到 SIGTERM, SIGINT, SIGQUIT
这三个信号中的任何一个时,terminate_request 会被置成 TRUE,nxlog_mainloop 结束,Nxlog 调用 nxlog_exit 执行 stop/shutdown module, 结束 event_thread, worker_threads, 回写 config cache,remove pidfile,回收资源等动作,最后 Nxlog 进程退出。