1. Nxlog 模块简介
在之前的文章当中我已经提到过 Nxlog 的 module 共分为4种,它们分别是 input, output, processor, extension
。 Input module 负责从各种数据源(如 file, archive, tcp, udp等)中采集数据,Output module 定义了该如何处理采集的数据,我们可以以文件的方式保存下来,也可能通过 tcp,udp 发出去。Processor module 负责对采集的数据进行处理,比如做一些内容过滤,格式转换之类的。Extension module 主要是用来扩展读写数据的接口,它可以针对特定的数据类型进行特定的处理,比如多行合并,将csv文件解析成不能的fields等,我的解释可能不太准确,官方的说法是:'These modules can enhance the features of nxlog in different ways, such as exporting new functions and procedures, registering additional I/O reader and writer functions to be used with modules supporting the OutputType and InputType directives'。
这里面 input 和 output 模块是必不可少的,processor 和 extension 可有可无。下面我以 im_file
模块为例来讲解一下每个模块的工作流程。im_file 是个 Input 模块,它主要是用来采集文件的。
2. Module 状态机
说到工作流程,那不得不提 状态机(FSM, Finite State Machines)
,通过状态机可以帮我们很好的理解 module 的工作过程。
如上图所示,每个 Module 总共有 4 种状态,它们分别是
UNINITIALIZED, STOPPED, RUNNING, PAUSED
。每个 Module 创建成功之后会通过 nx_module_config 调用自己 config 接口加载配置,这个时候还处在 UNINITIALIZED 状态。然后通过 nx_module_init 调用自己 init 接口进行初始化,状态转换为 STOPPED。接着通过 nx_module_start_shelf 调用自己 start 接口启动该 module,状态转换为 RUNNING。这时模块就开始产生各种 event 来工作,在这个过程中它的状态也可能会变为 PAUSED,比如当 im_file 模块采集了很多数据而 Output 模块又没有及时处理时,它的状态会变成 PAUSED,等缓存的数据被消费后它的状态会再变为 RUNNING。
当进程要退出时,会通过 nx_module_stop_self 接口调用各个模块的 stop 接口,状态转换为 STOPPED,接着通过 nx_module_shutdown_self 调用 shutdown 接口回归到最初的 UNINITIALIZED 状态。
3. im_file 详解
接下来我详细讲一下 im_file 的配置,启动,工作,停止流程。
3.1 im_file 配置
每个模块在启动的过程中都要加载配置,针对 im_file 模块,比较重要的配置有以下这些。
- File
指定需要采集的文件名,可以使用通配符。 - SavePos
指定是否需要保存文件的采集位置,以防止下一次启动后重采,默认是 TRUE。 - ReadFromLast
指定是否从文件末尾处开始采集。如果是 TRUE,就从文件尾开始采集,忽略存量数据。如果是 FALSE,要看保存的是否有文件位置,如果有就从上一次采集后的地方开始采集,如果没有就从文件首开始采集,默认是 TRUE。
3.2 im_file 启动
im_file_check_new(module, imconf->readfromlast);
im_file_add_poll_event(module, FALSE);
im_file_add_dircheck_event(module, FALSE);
start 接口主要干了 3 件事。
- 将配置文件中 File 指定的所有文件打开(当文件数目大于 ActiveFiles 时,其它的文件会关闭),然后把所有的 file 结构体保存在 imconf->files 哈希表中,其中 key 是文件名,value 是 file 结构体地址。当文件个数很多时,这个过程会很慢。
- 产生一个 poll event,这个 event 是用来读取所有 open_files 的数据。
- 产生一个 dircheck event, 这个 event 是用来检查有没有未处理的文件以及已处理的文件有没有新数据可读。
3.3 im_file 工作
im_file 的工作主要是处理启动阶段产生的两个 event。其中 NX_EVENT_READ 就是上面的 poll event, NX_EVENT_MODULE_SPECIFIC 就是上面的 dircheck event。
static void im_file_event(nx_module_t *module, nx_event_t *event)
{
ASSERT(event != NULL);
switch ( event->type )
{
case NX_EVENT_READ:
im_file_read(module);
break;
case NX_EVENT_MODULE_SPECIFIC:
im_file_dircheck_event_cb(module);
break;
default:
nx_panic("invalid event type: %d", event->type);
}
}
上面已经介绍了这两个 event 的主要工作,下面再详细说一下他们的流程。
-
NX_EVENT_READ
首先从 open_files 哈希表中取出一个 file,然后读取数据放到 buf 里,接着将这些数据一行行提取出来产生很多 logdata,logdata 会首先按照 Exec 的配置进行处理,如果没有被 drop 就根据 Router 的配置查看需要发送到哪个 module (Processor 或者 Output),然后再看要发送的 module queue 的 size,当 queue 满了(size >= NX_LOGQUEUE_LIMIT) 会根据 FlowControl 的配置,要么 free 该 logdata(FlowControl disable),要么将 im_file module 的状态置成 PAUSE(FlowControl enable),从而不再产生新的 logdata,以达到流控的目的。当 queue 未满就将该 logdata 放到要发送的module 的 queue 里,最后向这个 module 发送一个 NX_EVENT_DATA_AVAILABLE event,提醒它处理该 logdata,这样每个 logdata 的处理流程就算走完了。当这个 file 里的数据被处理完后,会从 open_files 列表中取出下一个 file 来处理,如此循环往复。为了防止文件很大导致这个过程占用很长时间,在外面就加了一个限制,最多循环 IM_FILE_MAX_READ 次。
当这次 READ 执行完后会再产生一个 NX_EVENT_READ event,至于下一个 event 何时执行取决于 imconf->files 列表中是否有没有读取完的文件,如果有就立即执行,如果没有就等待 poll_interval 秒后执行。
-
NX_EVENT_MODULE_SPECIFIC
if ( im_file_check_new(module, FALSE) == TRUE ) { //log_info("dircheck_event_cb detected new files in check_new()"); got_data = TRUE; } if ( im_file_check_files(module, FALSE) == TRUE ) { //log_info("dircheck_event_cb detected new files in check_files()"); got_data = TRUE; }
首先通过 im_file_check_new 接口检查有没有未曾处理的文件,如果 File 配置指定了一个文件夹,并且 recursive 是 TRUE,该接口会遍历这个文件夹下所有的文件,包括子目录。im_file 启动过程中就干过这件事,那次是将所有的存量文件加入到 imconf->files 哈希表中,这次则是基于存量文件检查有没有新增文件。
新增文件检查完后,还要通过 im_file_check_files 接口查看已处理的文件有没有新数据可读,它会检查文件的 size 和偏移 filepos,当 size > filepos 代表有未处理的数据。当找到 active_files 个可读文件后,这个接口会退出,以免耗费太多时间。其实在处理 NX_EVENT_READ event 快结束的时候也干过这件事,不过那时是只检查 imconf->files,这次是连 open_files 也一起检查。
通过上面两步如果发现有新增文件或者有新数据可读,got_data 会被置成 TRUE,这时会立即产生一个 NX_EVENT_READ event 去读取数据。接着再产生一个 NX_EVENT_MODULE_SPECIFIC event,等待 dircheck_interval 秒后执行下一次检查。
3.4 im_file 停止
stop 就比较简单了,首先将所有的 open_files 关闭,然后销毁所有的 imconf->files 结构,释放内存,这时会判断 savepos 是否为 TRUE,如果是 TRUE 会将文件的 filepos 保存到 ctx->config_cache 结构中,最后等所有的 module 退出后会将 ctx->config_cache 写入磁盘。这样
nxlog 下次启动时就知道上一次文件采集到了什么位置,不会重采。