近日,需要整合多源异构数据到es,采用了logstash作为数据处理和同步的服务,对mongodb数据进行同步。
环境
- elk 8.2.2
- mongodb
- logstash-input-mongodb插件
操作步骤
1. 安装logstash mongodb插件
logstash默认是不支持mongodb的,需要安装插件logstash-input-mongodb
1.1 联网环境
进入logstash的安装目录,执行命令:
./bin/logstash-plugin install logstash-input-mongodb
安装成功:
也可以使用命令核实是否安装成功:
./bin/logstash-plugin list | grep "logstash-input-mongodb"
1.2 离线环境、内网环境
如果环境不通互联网,使用上述方法安装会报错:
ERROR: Something went wrong when installing logstash-input-mongodb, message: execution expired
这时就需要一些额外的操作。
首先,获取离线安装包,有两种方式:
- 找一台在线的机器,安装同样版本的logstash后采用在线方式安装好插件,使用命令导出插件:
./bin/logstash-plugin prepare-offline-pack logstash-input-mongodb
导出成功后会显示导出的压缩包路径:
[test@localhost logstash]$ ./bin/logstash-plugin prepare-offline-pack logstash-input-mongodb
Using bundled JDK: /usr/share/logstash/jdk
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Offline package created at: /usr/share/logstash/logstash-offline-plugins-8.2.2.zip
You can install it with this command `bin/logstash-plugin install file:///usr/share/logstash/logstash-offline-plugins-8.2.2.zip`
- 如果elk为8.2.2版本,可以直接点击下载作者导出的压缩包,提取码:jMCP
获取到离线插件后,将其上传到目标机器,执行命令安装插件:
bin/logstash-plugin install file:///path/to/logstash-offline-plugins-8.2.2.zip
安装成功:
2. 编写logstash配置文件
在logstash配置目录下新建xxx.conf
配置文件,例如:
touch /etc/logstash/conf.d/logstash_mongo.conf
新建文件夹用于存放sqlite db库(该文件用于缓存,包括同步进度等数据),编辑文件:
mkdir /opt/logstash-mongodb
vim /etc/logstash/conf.d/logstash_mongo.conf
首先填入input、filter、output三部分内容:
input {
mongodb {
uri => 'mongodb://user:password@127.0.0.1:27017/database_name?ssl=false'
placeholder_db_dir => '/opt/logstash-mongodb/'
placeholder_db_name => 'xxx.db'
collection => 'xxx'
batch_size => 50
}
}
filter {
}
output {
elasticsearch {
action => "index"
index => "mongo_log_data"
hosts => ["127.0.0.1:9200"]
}
stdout {
codec => rubydebug
}
}
三部分缺一不可,否则会报错:
[ERROR] 2024-12-12 00:17:02.065 [Converge PipelineAction::Create<main>] agent - Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \\t\\r\\n], \"#\", \"input\", \"filter\", \"output\" at line 5, column 1 (byte 221) after ", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:32:in `compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:189:in `initialize'", "org/logstash/execution/JavaBasePipelineExt.java:72:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:48:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:50:in `execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:381:in `block in converge_state'"]}
mongodb若包含用户名和密码,一定要填入uri中(若密码中包含@
符号,使用%40
代替),否则启动会报错:
[ERROR] 2024-12-12 01:03:27.197 [Converge PipelineAction::Create<main>] agent - Failed to execute action {:id=>:main, :action_type=>LogStash::ConvergeResult::FailedAction, :message=>"Could not execute action: PipelineAction::Create<main>, action_result: false", :backtrace=>nil}
3. mongodb插入数据
需确保mongo中至少含有一条数据,否则会报错:
[INFO ] 2024-12-12 18:25:35.813 [[main]-pipeline-manager] mongodb - Registering MongoDB input
[ERROR] 2024-12-12 18:25:40.227 [[main]-pipeline-manager] javapipeline - Pipeline error {:pipeline_id=>"main", :exception=>#<NoMethodError: undefined method `[]' for nil:NilClass>, :backtrace=>["/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-mongodb-0.4.1/lib/logstash/inputs/mongodb.rb:99:in `init_placeholder'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-mongodb-0.4.1/lib/logstash/inputs/mongodb.rb:113:in `get_placeholder'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-mongodb-0.4.1/lib/logstash/inputs/mongodb.rb:160:in `block in update_watched_collections'", "org/jruby/RubyArray.java:1821:in `each'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-mongodb-0.4.1/lib/logstash/inputs/mongodb.rb:158:in `update_watched_collections'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-mongodb-0.4.1/lib/logstash/inputs/mongodb.rb:182:in `register'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:233:in `block in register_plugins'", "org/jruby/RubyArray.java:1821:in `each'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:232:in `register_plugins'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:391:in `start_inputs'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:316:in `start_workers'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:190:in `run'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:142:in `block in start'"], "pipeline.sources"=>["/etc/logstash/conf.d/logstash_mongo.conf"], :thread=>"#<Thread:0x5a33d505 run>"}
[INFO ] 2024-12-12 18:25:40.231 [[main]-pipeline-manager] javapipeline - Pipeline terminated {"pipeline.id"=>"main"}
[ERROR] 2024-12-12 18:25:40.266 [Converge PipelineAction::Create<main>] agent - Failed to execute action {:id=>:main, :action_type=>LogStash::ConvergeResult::FailedAction, :message=>"Could not execute action: PipelineAction::Create<main>, action_result: false", :backtrace=>nil}
4. 启动logstash
在logstash目录下执行命令启动logstash:
./bin/logstash -f /etc/logstash/conf.d/logstash_mongo.conf --debug
如果数据结构比较简单,并且MongoDB和es都连接顺利的话,到此就可以直接去kibana中查看数据了。
由于这里使用的是elk 8.x的版本,其默认启动https连接,因此启动logstash会报证书的错误:
PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
由于这里的es是通过ip直连的,也没有https证书,因此需要在logstash配置文件的output
中添加配置项忽略证书校验,修改后的output为:
output {
stdout { codec => rubydebug }
elasticsearch {
hosts => ["https://10.182.174.89:9200"]
index => "scan_cuckoo_yara"
user => "tip"
password => "hstic@2020"
ssl => true
ssl_certificate_verification => false
}
其中ssl
和ssl_certificate_verification
配置项都需要添加,缺一不可。
5. logstash 深入配置
如果mongodb数据结构比较复杂的话,那上一步多半是启动失败的,这里需要额外的配置来解决。
并且,将mongo数据转换到es,数据并不是一成不变,也需要设定一些数据转换,这里使用logstash的filter
配置实现。
5.1 MongoDB字段层级关系保留
当logstash-input-mongodb未配置parse_method
字段时,默认为flatten
,即将所有字段全部展开。
对于以下数据:
{
"id": 123,
"user": {
"name": "test",
"department": "test"
}
}
经过输入插件后,其数据将会变为:
{
"id": 123,
"user_name": "test",
"user_department": "test"
}
对于多层嵌套数据,也会全部展开,使用下划线连接。有的时候,这并不满足我们的要求,更何况对于一些本身字段就带下划线的数据更是不友好。因此,这里我们查阅插件代码,发现其支持以下模式:
- flatten:所有字段全部展开,默认模式
- dig:对json数据进行挖掘,针对设定的字段展开,最多挖掘两层
- simple:所有字段均不展开,只对第一层字段解析,嵌套结构直接转为字符串
这里我们选择自由度比较高的dig模式,添加配置项parse_method => "dig"
到input配置中,并通过字段dig_fields
和dig_dig_fields
设定需要挖掘的嵌套的字段,设定后的input如下:
input {
mongodb {
codec => "json"
parse_method => "dig"
dig_fields => ["info", "target"]
dig_dig_fields => ["file"]
uri => "mongodb://test:test@127.0.0.1:27017/test"
placeholder_db_dir => "/home/test/logstash/db_dir"
placeholder_db_name => "logstash_sqlite.db"
collection => "test"
}
}
其中,dig_fields
表示第一层需要挖掘的字段,dig_dig_fields
表示第二层需要挖掘的字段,上述配置适用于以下json:
{
"aaa: "aaa",
"info": {
"bbb": "bbb",
"ccc": "ccc"
},
"target": {
"ddd": "ddd",
"file": {
"name": "xxxx",
"size": 123,
"obj": {
"test": "test"
}
}
}
}
挖掘后的数据为:
{
"aaa: "aaa",
"info_bbb": "bbb",
"info_ccc": "ccc",
"target_ddd": "ddd",
"target_file_name": "xxxx",
"target_file_size": 123,
"target_file_obj": "{\"test\"=>\"test\"}"
}
5.2 冗余字段去除
在filter中添加以下配置去掉冗余字段:
mutate {
remove_field => ["host","@version","logdate","log_entry","@timestamp","_id"]
}
5.3 json格式转换
对于MongoDB中的多层级数据,插件处理后会将其直接转为json字符串,logstash中可以在filter
中添加以下配置信息:
json {
source => "process_tree"
target => "process_tree"
}
但该插件为ruby书写,使用to_s
转换字符串,该方法转换为的json字符串中的key和value并不是传统的以冒号:
连接,而是=>
,这里可以更改其插件处理源码,添加逻辑判断,当其仅为string
情况时才使用to_s
,否则使用to_json
,源码位于logstash安装目录下的./vendor/bundle/jruby/2.5.0/gems/logstash-input-mongodb-0.4.1/lib/logstash/inputs/mongodb.rb
,更改其run函数下的dig解析部分为:
elsif @parse_method == 'dig'
doc.each do |k, v|
if k != "_id"
if (@dig_fields.include? k) && (v.respond_to? :each)
v.each do |kk, vv|
if (@dig_dig_fields.include? kk) && (vv.respond_to? :each)
vv.each do |kkk, vvv|
if /\A[-+]?\d+\z/ === vvv
event.set("#{k}_#{kk}_#{kkk}",vvv.to_i)
else
event.set("#{k}_#{kk}_#{kkk}", vvv.to_s)
end
end
else
if /\A[-+]?\d+\z/ === vv
event.set("#{k}_#{kk}", vv.to_i)
elsif vv.is_a? String
event.set("#{k}_#{kk}",vv.to_s)
else
event.set("#{k}_#{kk}",vv.to_json)
end
end
end
else
if /\A[-+]?\d+\z/ === v
event.set(k,v.to_i)
elsif v.is_a? String
event.set(k,v.to_s)
else
event.set(k,v.to_json)
end
end
end
end
然后再使用logstash转换json就没问题了
5.4 时间转换
如果源数据中含有时间类型的字段,直接启动logstash可能会报错:
Missing Converter handling for full class name=org.jruby.gen.RubyObject4, simple name=RubyObject4
或者不报错,但在es中时间字段显示为形如\"2024-12-09 20:16:09 UTC\"
这样的非预期格式的string类型字符串。理想情况下字段转换后在插入es时应自动匹配date类型。
这里首先确保上一步的parse_method => "dig"
配置已添加,然后在filter中书写时间转换配置:
mutate {
gsub => [
"info_started", "\"", "",
"info_ended", "\"", "",
"info_started", " UTC", "",
"info_ended", " UTC", ""
]
}
date {
match => [ "info_started", "yyyy-MM-dd HH:mm:ss" ]
}
date {
match => [ "info_ended", "yyyy-MM-dd HH:mm:ss" ]
target => "@timestamp"
}
其中:
-
mutate
表示字段变更,gsub
表示字符串替换,这里将字符串首尾的\"
都去掉,并去掉UTC
符号 -
date
表示时间匹配,match中
的第二个参数表示匹配的格式,若需要根据匹配后的值新增字段,则可以设定target
5.5 类型转换
插件的dig模式仅将字段的值转为string类型,对于一些int类型数据,需要添加filter配置转换其格式:
mutate {
convert => {
"info_duration" => "integer"
"info_id" => "integer"
"target_file_size" => "integer"
}
}
5.6 字段重命名
更改字段名称,也可将字段由aaa变为aaa.bbb:
mutate {
rename => {
"info_started" => "start_date"
"target_file_sha1" => "[file_info][sha1]"
}
}
5.7 字段添加
可以为es记录添加字段:
mutate {
add_field => {
"engine" => "test"
}
}
5.8 一条数据解析为多条es记录
源数据中存在列表类型的字段,每条列表为json对象,想要将其拆分出来转换为多条es记录,并存储至不同的bucket中。这种情况下,传统的logstash支持的filter的各种范式就不满足要求了,但可以在filter中书写ruby脚本去处理。
例如,对于数据:
{
"aaa": "aaa",
"datas": {
"zhangsan": [
{
"name": "zhangsan",
"details": {
"email": "zhangsan@163.com",
"department": "zhangjiakou"
}
}
],
"lisi": [
{
"name": "lisi",
"details": {
"email": "lisi@163.com",
"department": "zhangjiakou"
}
}
]
}
}
可以在filter中添加以下配置:
ruby {
code => "
new_event_clone = event.clone
new_event_clone.remove('datas')
new_event = new_event_clone.to_hash
data_result = event.get('datas')
if data_result.to_s != ''
data_result_parse = JSON.parse(data_result)
# puts data_result_parse
if data_result_parse.is_a?(Hash)
data_result_parse.each_pair do |k, v|
if v.is_a?(Array)
v.each do |item|
temp_event = LogStash::Event.new(new_event)
temp_event.remove('@version')
temp_event.set('name', item['name'])
temp_event.set('department', item['details']['department'])
temp_event.set('email', item['details']['email'])
temp_event.set('engine', 'test_data')
new_event_block.call(temp_event)
end
else
puts k, v
end
end
#event.cancel
else
puts data_result
end
end
# remove rebundant fields
event.remove('datas')
"
}
其中:
- 使用
LogStash::Event.new()
新建一个记录事件 - 使用
new_event_block.call()
负责将新产生的记录发送到output
处理 - 若源数据想要丢弃,可以在末尾添加代码
event.cancel
取消记录发送 - 使用
JSON.parse()
之前需确保处理的字段存在且不为空(上述代码中添加了判断语句),否则会报错:
[ERROR] 2024-12-14 01:18:25.221 [[main]>worker0] ruby - Ruby exception occurred: no implicit conversion of nil into String {:class=>"TypeError", :backtrace=>["json/ext/Parser.java:173:in initialize'", "json/ext/Parser.java:150:in new'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/json-1.8.6-java/lib/json/common.rb:155:in parse'", "(ruby filter code):13:in block in filter_method'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-ruby-3.1.8/lib/logstash/filters/ruby.rb:96:in inline_script'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-ruby-3.1.8/lib/logstash/filters/ruby.rb:89:in filter'", "/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:159:in do_filter'", "/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:178:in block in multi_filter'", "org/jruby/RubyArray.java:1821:in each'", "/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:175:in multi_filter'", "org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java:134:in multi_filter'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:300:in block in start_workers'"]}
5.9 ruby代码封装为文件
上述写的ruby代码太多了,全部写在logstash配置文件中存在以下缺点:
- 不便于使用代码编辑器的代码高亮、纠错等功能
- 代码量太大,松散地分布于配置文件中,不便于维护
- 不够美观
因此,将ruby代码单独储存在xxx.rb文件中,再通过路径引入。
- 配置filter引入代码文件:
filter {
ruby {
path => "/etc/logstash/xxx.rb"
script_params => { "percentage" => 0.9 }
}
}
- 封装ruby代码到文件
logstash在引入ruby代码文件时,需要在ruby代码文件中定义两个函数:
# 在logstash启动时就执行,其中的params参数对应配置文件中的script_params
def register(params)
@drop_percentage = params["percentage"]
end
# 接收event事件作为输入,输出为event列表,以此实现多记录事件输出
# 如果想要丢掉某个事件,在输出列表中排除即可,无需再使用event.cancel
def filter(event)
if rand >= @drop_percentage
return [event]
else
return [] # return empty array to cancel event
end
end
5.10 不同数据输入采用不同的处理
如果具有多输入源,例如:多个MongoDB数据、不同类型数据(包括MongoDB、文件、kafka等),可以在输入时配置type属性打标,再在filter中分条件采取不同的处理方式:
input {
mongodb {
uri => 'mongodb://user:password@127.0.0.1:27017/database_name?ssl=false'
placeholder_db_dir => '/opt/logstash-mongodb/'
placeholder_db_name => 'xxx.db'
collection => 'aaa'
batch_size => 50
type => 'test_aaa'
}
file {
path => ["/home/test/logstash/data/*.csv"]
start_position => "beginning"
sincedb_path => "/home/test/logstash/db_dir/sincedb.log"
type => "test_bbb"
}
}
filter {
if[type] == "test_aaa" {
mutate {
...
}
...
}
if [type] == "test_bbb" {
mutate {
...
}
...
}
}
5.11 不同数据存入不同es bucket
对于多种数据,需要分别将其存入不同bucket的es中,采用不同的index
,需要:
- 为每种数据打标
例如上文中举例,为源数据打标engine
字段值为test
,为拆分后的数据设定engine
为test_data
- 在
output
中为不同标签定义不同输出
例如更改logstash配置中的output
部分为:
output {
stdout { codec => rubydebug }
if [engine] == "test_data" {
elasticsearch {
hosts => ["https://127.0.0.1:9200"]
index => "test_data"
user => "test"
password => "test"
ssl => true
ssl_certificate_verification => false
}
} else if [engine] == "test" {
elasticsearch {
hosts => ["https://127.0.0.1:9200"]
index => "test"
user => "test"
password => "test"
ssl => true
ssl_certificate_verification => false
}
}
}