目标
这里我们实现一个简单的demo工程,展示auditbeat
如何实现一个module,并把数据发送到ElasticSearch,工程源码可以在Github上下载
- 创建一个
auditbeat
Module -
auditbeat
Module周期性向ElasticSearch发送含有Host Name的心跳数据包
Demo工程基于
elastic-beats
8.3.3版本,早期版本原理类似
概念
-
Module
Module就是
auditbeat
中具体实现一个功能的模块,auditbeat
依赖metricbeat
和libbeat
提供的API,可以很快的开发一个模块去采集数据并存储到ElasticSearch中 -
MetricSet
MetricSet是
auditbeat
的基本单位,一个MetricSet泛指收集的一种数据类型,一个auditbeat
Module可以包含多个MetricSet,有些简单的Module只含有一个MetricSet。例如,
auditbeat
的system
Module中包含有login
,user
,socket
,process
等MetricSet,auditd
Module中只有一个MetricSet -
Fields
Fields是一个MetricSet产生的数据结构映射,
auditbeat
会将Fields通过auditbeat setup
命令将Fields导入到index template中,以便在Kibana中展示 -
Event
Event是
auditbeat
中产生的包含数据的事件,由MetricSet产生,需要符合声明的Fields映射规范,并最终存储到ElasticSearch中
创建Module
- 初始化Demo Module目录结构在路径
auditbeat/module/zack_module
auditbeat/module/zack_module/ ├── heartbeat │ ├── event.go │ ├── _meta │ │ └── fields.yml │ └── metricset.go ├── _meta │ ├── data.json │ └── fields.yml ├── zack.go └── zack_metric_set.go
_meta
路径很重要,一定要创建,开发elastic-beats
的很多工具会依赖这个路径 - 在
auditbeat/module/zack_module/zack.go
文件,声明Module如下package zack_module import ( "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/go-sysinfo" ) const moduleName = "zack" func init() { if err := mb.Registry.AddModule(moduleName, NewModule); err != nil { panic(err) } } type ZackModule struct { mb.BaseModule hostName string } func NewModule(base mb.BaseModule) (mb.Module, error) { log := logp.NewLogger(moduleName) var hostName string if hostInfo, err := sysinfo.Host(); err != nil { log.Errorf("Could not get host info. err=%+v", err) } else { hostName = hostInfo.Info().Hostname } if hostName == "" { log.Warnf("Could not get host name") } return &ZackModule{ BaseModule: base, hostName: hostName, }, nil }
-
auditbeat
中要在init
方法中将Module通过mb.Registry.AddModule
方法注册给metricbeat
- Module实现都要继承
mb.BaseModule
-
创建MetricSet
- 文件
auditbeat/module/zack_module/zack_metric_set.go
中声明MetricSetpackage zack_module import "github.com/elastic/beats/v7/metricbeat/mb" type ZackMetricSet struct { mb.BaseMetricSet module *ZackModule } func NewZackMetricSet(base mb.BaseMetricSet) ZackMetricSet { return ZackMetricSet{ BaseMetricSet: base, module: base.Module().(*ZackModule), } } func (ms *ZackMetricSet) Hostname() string { return ms.module.hostName }
auditbeat
的MetricSet实现需要继承mb.BaseMetricSet
- 声明Event在文件
auditbeat/module/zack_module/heartbeat/event.go
package heartbeat import ( "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/elastic-agent-libs/mapstr" ) type HeartBeatEvent struct { HostName string } func (e *HeartBeatEvent) buildHeartBeatEvent() mb.Event { data := mapstr.M{ "host_name": e.HostName, } return mb.Event{ RootFields: mapstr.M{ "heartbeat": data, }, } }
- 在MetricSet
auditbeat/module/zack_module/heartbeat/metricset.go
路径中,添加MetricSet实现package heartbeat import ( "github.com/elastic/beats/v7/auditbeat/module/zack_module" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" "github.com/elastic/elastic-agent-libs/logp" ) const ( moduleName = "zack" metricSetName = "heartbeat" namespace = "." ) type MetricSet struct { zack_module.ZackMetricSet // We can add config or other attibutes implementation here for HeartBeat MetricBeat log *logp.Logger } func init() { mb.Registry.MustAddMetricSet(moduleName, metricSetName, New, mb.DefaultMetricSet(), mb.WithHostParser(parse.EmptyHostParser), mb.WithNamespace(namespace)) } func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ ZackMetricSet: zack_module.NewZackMetricSet(base), log: logp.NewLogger(metricSetName), }, nil } func (ms *MetricSet) Fetch(reporter mb.ReporterV2) { event := HeartBeatEvent{ HostName: ms.ZackMetricSet.Hostname(), } ms.log.Info("sending heartbeat event") reporter.Event(event.buildHeartBeatEvent()) }
这里我们实现了
Fetch
接口来周期性获取数据,默认internal为10s,我们还可以使用Run
接口来一次性触发任务
注册Zack Module到auditbeat
中
-
通过工具
/dev-tools/cmd/module_include_list/module_include_list.go
注册我们开发的Module和MetricSet,VSCodelaunch.json
示例如下{ "name": "[Dev] Module Include", "type": "go", "request": "launch", "mode": "auto", "cwd": "${workspaceFolder}", "program": "${workspaceFolder}/dev-tools/cmd/module_include_list/module_include_list.go", "console": "integratedTerminal", "args": [ "-moduleDir=auditbeat/module", "-out=auditbeat/include/list.go" ] }
运行后查看
auditbeat/include/list.go
是否引入了我们最新开发的Module和MetricSet
如果没有引入,请检查Module和MetricSet路径是否包含_meta
路径 -
编辑
fields.yml
,声明heatbeat.host_name
映射为keyword
- key: zack title: "Zack" decription: > These are fields generated by the insight module. fields: - name: heartbeat type: group fields: - name: host_name type: keyword
-
通过工具
/dev-tools/cmd/asset/asset.go
生成Fields映射,生成auditbeat/module/zack_module/fields.go
文件,VSCode launch.json示例如下:{ "name": "[Dev] Generate Fields", "type": "go", "request": "launch", "mode": "auto", "cwd": "${workspaceFolder}", "program": "${workspaceFolder}/dev-tools/cmd/asset/asset.go", "console": "integratedTerminal", "args": [ "-in=auditbeat/module/zack_module/_meta/fields.yml", "-out=auditbeat/module/zack_module/fields.go", "-name=zack", "-priority=asset.ModuleFieldsPri", "-pkg=zack_module", "auditbeat" ] }
-
启用Module和MetriSet,编辑
auditbeat.yml
如下:auditbeat.modules: - module: zack metricsets: - heartbeat
验证
-
通过
auditbeat setup
命令更新index template,VSCodelaunch.json
如下{ "name": "AuditBeat Setup", "type": "go", "request": "launch", "mode": "auto", "program": "${workspaceFolder}/auditbeat/main.go", "cwd": "${workspaceFolder}", "console": "integratedTerminal", "asRoot": true, "args": [ "setup", "-c", "path to auditbeat.yml", "-e" ] }
-
Kinana上验证Fields成功导入如下
- 启动
auditbeat
,VSCodelaunch.json
如下{ "name": "AuditBeat", "type": "go", "request": "launch", "mode": "auto", "program": "${workspaceFolder}/auditbeat/main.go", "console": "integratedTerminal", "asRoot": true, "args": [ "-c", "<path to auditbeat.yml>", "-e" ] }
-
验证Module数据成功导入ElaticSearch