使用logstash将mongodb数据同步至es

近日,需要整合多源异构数据到es,采用了logstash作为数据处理和同步的服务,对mongodb数据进行同步。

环境

  • elk 8.2.2
  • mongodb
  • logstash-input-mongodb插件

操作步骤

1. 安装logstash mongodb插件

logstash默认是不支持mongodb的,需要安装插件logstash-input-mongodb

1.1 联网环境

进入logstash的安装目录,执行命令:

./bin/logstash-plugin install logstash-input-mongodb

安装成功:


安装logstash插件

也可以使用命令核实是否安装成功:

./bin/logstash-plugin list | grep "logstash-input-mongodb"

1.2 离线环境、内网环境

如果环境不通互联网,使用上述方法安装会报错:

ERROR: Something went wrong when installing logstash-input-mongodb, message: execution expired

这时就需要一些额外的操作。
首先,获取离线安装包,有两种方式:

  1. 找一台在线的机器,安装同样版本的logstash后采用在线方式安装好插件,使用命令导出插件:
./bin/logstash-plugin prepare-offline-pack logstash-input-mongodb

导出成功后会显示导出的压缩包路径:

[test@localhost logstash]$ ./bin/logstash-plugin prepare-offline-pack logstash-input-mongodb
Using bundled JDK: /usr/share/logstash/jdk
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Offline package created at: /usr/share/logstash/logstash-offline-plugins-8.2.2.zip

You can install it with this command `bin/logstash-plugin install file:///usr/share/logstash/logstash-offline-plugins-8.2.2.zip`
  1. 如果elk为8.2.2版本,可以直接点击下载作者导出的压缩包,提取码:jMCP
    获取到离线插件后,将其上传到目标机器,执行命令安装插件:
bin/logstash-plugin install file:///path/to/logstash-offline-plugins-8.2.2.zip

安装成功:


离线安装插件成功

2. 编写logstash配置文件

在logstash配置目录下新建xxx.conf配置文件,例如:

touch /etc/logstash/conf.d/logstash_mongo.conf

新建文件夹用于存放sqlite db库(该文件用于缓存,包括同步进度等数据),编辑文件:

mkdir /opt/logstash-mongodb
vim /etc/logstash/conf.d/logstash_mongo.conf

首先填入input、filter、output三部分内容:

input {
    mongodb {
        uri => 'mongodb://user:password@127.0.0.1:27017/database_name?ssl=false'
        placeholder_db_dir => '/opt/logstash-mongodb/'
        placeholder_db_name => 'xxx.db'
        collection => 'xxx'
        batch_size => 50
    }
}
filter {
}
output {
    elasticsearch {
        action => "index"
        index => "mongo_log_data"
        hosts => ["127.0.0.1:9200"]
    }
    stdout {
        codec => rubydebug
    }
}

三部分缺一不可,否则会报错:

[ERROR] 2024-12-12 00:17:02.065 [Converge PipelineAction::Create<main>] agent - Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \\t\\r\\n], \"#\", \"input\", \"filter\", \"output\" at line 5, column 1 (byte 221) after ", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:32:in `compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:189:in `initialize'", "org/logstash/execution/JavaBasePipelineExt.java:72:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:48:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:50:in `execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:381:in `block in converge_state'"]}

mongodb若包含用户名和密码,一定要填入uri中(若密码中包含@符号,使用%40代替),否则启动会报错:

[ERROR] 2024-12-12 01:03:27.197 [Converge PipelineAction::Create<main>] agent - Failed to execute action {:id=>:main, :action_type=>LogStash::ConvergeResult::FailedAction, :message=>"Could not execute action: PipelineAction::Create<main>, action_result: false", :backtrace=>nil}

3. mongodb插入数据

需确保mongo中至少含有一条数据,否则会报错:

[INFO ] 2024-12-12 18:25:35.813 [[main]-pipeline-manager] mongodb - Registering MongoDB input
[ERROR] 2024-12-12 18:25:40.227 [[main]-pipeline-manager] javapipeline - Pipeline error {:pipeline_id=>"main", :exception=>#<NoMethodError: undefined method `[]' for nil:NilClass>, :backtrace=>["/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-mongodb-0.4.1/lib/logstash/inputs/mongodb.rb:99:in `init_placeholder'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-mongodb-0.4.1/lib/logstash/inputs/mongodb.rb:113:in `get_placeholder'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-mongodb-0.4.1/lib/logstash/inputs/mongodb.rb:160:in `block in update_watched_collections'", "org/jruby/RubyArray.java:1821:in `each'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-mongodb-0.4.1/lib/logstash/inputs/mongodb.rb:158:in `update_watched_collections'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-mongodb-0.4.1/lib/logstash/inputs/mongodb.rb:182:in `register'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:233:in `block in register_plugins'", "org/jruby/RubyArray.java:1821:in `each'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:232:in `register_plugins'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:391:in `start_inputs'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:316:in `start_workers'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:190:in `run'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:142:in `block in start'"], "pipeline.sources"=>["/etc/logstash/conf.d/logstash_mongo.conf"], :thread=>"#<Thread:0x5a33d505 run>"}
[INFO ] 2024-12-12 18:25:40.231 [[main]-pipeline-manager] javapipeline - Pipeline terminated {"pipeline.id"=>"main"}
[ERROR] 2024-12-12 18:25:40.266 [Converge PipelineAction::Create<main>] agent - Failed to execute action {:id=>:main, :action_type=>LogStash::ConvergeResult::FailedAction, :message=>"Could not execute action: PipelineAction::Create<main>, action_result: false", :backtrace=>nil}

4. 启动logstash

在logstash目录下执行命令启动logstash:

./bin/logstash -f /etc/logstash/conf.d/logstash_mongo.conf --debug

如果数据结构比较简单,并且MongoDB和es都连接顺利的话,到此就可以直接去kibana中查看数据了。

由于这里使用的是elk 8.x的版本,其默认启动https连接,因此启动logstash会报证书的错误:

PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target

由于这里的es是通过ip直连的,也没有https证书,因此需要在logstash配置文件的output中添加配置项忽略证书校验,修改后的output为:

output {
  stdout { codec => rubydebug }
  elasticsearch {
    hosts => ["https://10.182.174.89:9200"]
    index => "scan_cuckoo_yara"
    user => "tip"
    password => "hstic@2020"
    ssl => true
    ssl_certificate_verification => false
}

其中sslssl_certificate_verification配置项都需要添加,缺一不可。

5. logstash 深入配置

如果mongodb数据结构比较复杂的话,那上一步多半是启动失败的,这里需要额外的配置来解决。
并且,将mongo数据转换到es,数据并不是一成不变,也需要设定一些数据转换,这里使用logstash的filter配置实现。

5.1 MongoDB字段层级关系保留

当logstash-input-mongodb未配置parse_method字段时,默认为flatten,即将所有字段全部展开。
对于以下数据:

{
    "id": 123,
    "user": {
        "name": "test",
        "department": "test"
    }
}

经过输入插件后,其数据将会变为:

{
    "id": 123,
    "user_name": "test",
    "user_department": "test"
}

对于多层嵌套数据,也会全部展开,使用下划线连接。有的时候,这并不满足我们的要求,更何况对于一些本身字段就带下划线的数据更是不友好。因此,这里我们查阅插件代码,发现其支持以下模式:

  • flatten:所有字段全部展开,默认模式
  • dig:对json数据进行挖掘,针对设定的字段展开,最多挖掘两层
  • simple:所有字段均不展开,只对第一层字段解析,嵌套结构直接转为字符串

这里我们选择自由度比较高的dig模式,添加配置项parse_method => "dig"到input配置中,并通过字段dig_fieldsdig_dig_fields设定需要挖掘的嵌套的字段,设定后的input如下:

input {
  mongodb {
        codec => "json"
        parse_method => "dig"
        dig_fields => ["info", "target"]
        dig_dig_fields => ["file"]
        uri => "mongodb://test:test@127.0.0.1:27017/test"
        placeholder_db_dir => "/home/test/logstash/db_dir"
        placeholder_db_name => "logstash_sqlite.db"
        collection => "test"
    }
}

其中,dig_fields表示第一层需要挖掘的字段,dig_dig_fields表示第二层需要挖掘的字段,上述配置适用于以下json:

{
    "aaa: "aaa",
    "info": {
        "bbb": "bbb",
        "ccc": "ccc"
    },
    "target": {
        "ddd": "ddd",
        "file": {
            "name": "xxxx",
            "size": 123,
            "obj": {
                "test": "test"
            }
        }
    }
}

挖掘后的数据为:

{
    "aaa: "aaa",
    "info_bbb": "bbb",
    "info_ccc": "ccc",
    "target_ddd": "ddd",
    "target_file_name": "xxxx",
    "target_file_size": 123,
    "target_file_obj": "{\"test\"=>\"test\"}"
}

5.2 冗余字段去除

在filter中添加以下配置去掉冗余字段:

  mutate {
    remove_field => ["host","@version","logdate","log_entry","@timestamp","_id"]
  }

5.3 json格式转换

对于MongoDB中的多层级数据,插件处理后会将其直接转为json字符串,logstash中可以在filter中添加以下配置信息:

  json {
    source => "process_tree"
    target => "process_tree"
  }

但该插件为ruby书写,使用to_s转换字符串,该方法转换为的json字符串中的key和value并不是传统的以冒号:连接,而是=>,这里可以更改其插件处理源码,添加逻辑判断,当其仅为string情况时才使用to_s,否则使用to_json,源码位于logstash安装目录下的./vendor/bundle/jruby/2.5.0/gems/logstash-input-mongodb-0.4.1/lib/logstash/inputs/mongodb.rb,更改其run函数下的dig解析部分为:

            elsif @parse_method == 'dig'
              doc.each do |k, v|
                if k != "_id"
                  if (@dig_fields.include? k) && (v.respond_to? :each)
                    v.each do |kk, vv|
                      if (@dig_dig_fields.include? kk) && (vv.respond_to? :each)
                        vv.each do |kkk, vvv|
                          if /\A[-+]?\d+\z/ === vvv
                            event.set("#{k}_#{kk}_#{kkk}",vvv.to_i)
                          else
                            event.set("#{k}_#{kk}_#{kkk}", vvv.to_s)
                          end
                        end
                      else
                        if /\A[-+]?\d+\z/ === vv
                          event.set("#{k}_#{kk}", vv.to_i)
                        elsif vv.is_a? String
                          event.set("#{k}_#{kk}",vv.to_s)
                        else
                          event.set("#{k}_#{kk}",vv.to_json)
                        end
                      end
                    end
                  else
                    if /\A[-+]?\d+\z/ === v
                      event.set(k,v.to_i)
                    elsif v.is_a? String
                      event.set(k,v.to_s)
                    else
                      event.set(k,v.to_json)
                    end
                  end
                end
              end

然后再使用logstash转换json就没问题了

5.4 时间转换

如果源数据中含有时间类型的字段,直接启动logstash可能会报错:

Missing Converter handling for full class name=org.jruby.gen.RubyObject4, simple name=RubyObject4

或者不报错,但在es中时间字段显示为形如\"2024-12-09 20:16:09 UTC\"这样的非预期格式的string类型字符串。理想情况下字段转换后在插入es时应自动匹配date类型。
这里首先确保上一步的parse_method => "dig"配置已添加,然后在filter中书写时间转换配置:

  mutate {
    gsub => [
      "info_started", "\"", "",
      "info_ended", "\"", "",
      "info_started", " UTC", "",
      "info_ended", " UTC", ""
    ]
  }
  date {
    match => [ "info_started", "yyyy-MM-dd HH:mm:ss" ]
  }
  date {
    match => [ "info_ended", "yyyy-MM-dd HH:mm:ss" ]
    target => "@timestamp"
  }

其中:

  • mutate表示字段变更,gsub表示字符串替换,这里将字符串首尾的\"都去掉,并去掉UTC符号
  • date表示时间匹配,match中的第二个参数表示匹配的格式,若需要根据匹配后的值新增字段,则可以设定target

5.5 类型转换

插件的dig模式仅将字段的值转为string类型,对于一些int类型数据,需要添加filter配置转换其格式:

  mutate {
    convert => {
      "info_duration" => "integer"
      "info_id" => "integer"
      "target_file_size" => "integer"
    }
  }

5.6 字段重命名

更改字段名称,也可将字段由aaa变为aaa.bbb:

  mutate {
    rename => {
      "info_started" => "start_date"
      "target_file_sha1" => "[file_info][sha1]"
    }
  }

5.7 字段添加

可以为es记录添加字段:

  mutate {
    add_field => {
      "engine" => "test"
    }
  }

5.8 一条数据解析为多条es记录

源数据中存在列表类型的字段,每条列表为json对象,想要将其拆分出来转换为多条es记录,并存储至不同的bucket中。这种情况下,传统的logstash支持的filter的各种范式就不满足要求了,但可以在filter中书写ruby脚本去处理。
例如,对于数据:

{
  "aaa": "aaa",
  "datas": {
    "zhangsan": [
      {
        "name": "zhangsan",
        "details": {
          "email": "zhangsan@163.com",
          "department": "zhangjiakou"
        }
      }
    ],
    "lisi": [
      {
        "name": "lisi",
        "details": {
            "email": "lisi@163.com",
            "department": "zhangjiakou"
        }
      }
    ]
  }
}

可以在filter中添加以下配置:

  ruby {
    code => "
        new_event_clone = event.clone
        new_event_clone.remove('datas')
        new_event = new_event_clone.to_hash

        data_result = event.get('datas')
        if data_result.to_s != ''
            data_result_parse = JSON.parse(data_result)
            # puts data_result_parse
            if data_result_parse.is_a?(Hash)
                data_result_parse.each_pair do |k, v|
                    if v.is_a?(Array)
                        v.each do |item|
                            temp_event = LogStash::Event.new(new_event)
                            temp_event.remove('@version')
                            temp_event.set('name', item['name'])
                            temp_event.set('department', item['details']['department'])
                            temp_event.set('email', item['details']['email'])
                            temp_event.set('engine', 'test_data')
                            
                            new_event_block.call(temp_event)
                        end
                    else
                        puts k, v
                    end
                end
                #event.cancel
            else
                puts data_result
            end
        end
        # remove rebundant fields
        event.remove('datas')
    "
  }

其中:

  • 使用LogStash::Event.new()新建一个记录事件
  • 使用new_event_block.call()负责将新产生的记录发送到output处理
  • 若源数据想要丢弃,可以在末尾添加代码event.cancel取消记录发送
  • 使用JSON.parse()之前需确保处理的字段存在且不为空(上述代码中添加了判断语句),否则会报错:
[ERROR] 2024-12-14 01:18:25.221 [[main]>worker0] ruby - Ruby exception occurred: no implicit conversion of nil into String {:class=>"TypeError", :backtrace=>["json/ext/Parser.java:173:in initialize'", "json/ext/Parser.java:150:in new'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/json-1.8.6-java/lib/json/common.rb:155:in parse'", "(ruby filter code):13:in block in filter_method'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-ruby-3.1.8/lib/logstash/filters/ruby.rb:96:in inline_script'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-ruby-3.1.8/lib/logstash/filters/ruby.rb:89:in filter'", "/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:159:in do_filter'", "/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:178:in block in multi_filter'", "org/jruby/RubyArray.java:1821:in each'", "/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:175:in multi_filter'", "org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java:134:in multi_filter'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:300:in block in start_workers'"]}

5.9 ruby代码封装为文件

上述写的ruby代码太多了,全部写在logstash配置文件中存在以下缺点:

  • 不便于使用代码编辑器的代码高亮、纠错等功能
  • 代码量太大,松散地分布于配置文件中,不便于维护
  • 不够美观
    因此,将ruby代码单独储存在xxx.rb文件中,再通过路径引入。
  1. 配置filter引入代码文件:
    filter {
      ruby {
        path => "/etc/logstash/xxx.rb"
        script_params => { "percentage" => 0.9 }
      }
    }
  1. 封装ruby代码到文件
    logstash在引入ruby代码文件时,需要在ruby代码文件中定义两个函数:
# 在logstash启动时就执行,其中的params参数对应配置文件中的script_params 
def register(params)
    @drop_percentage = params["percentage"]
end

# 接收event事件作为输入,输出为event列表,以此实现多记录事件输出
# 如果想要丢掉某个事件,在输出列表中排除即可,无需再使用event.cancel
def filter(event)
    if rand >= @drop_percentage
        return [event]
    else
        return [] # return empty array to cancel event
    end
end

5.10 不同数据输入采用不同的处理

如果具有多输入源,例如:多个MongoDB数据、不同类型数据(包括MongoDB、文件、kafka等),可以在输入时配置type属性打标,再在filter中分条件采取不同的处理方式:

input {
    mongodb {
        uri => 'mongodb://user:password@127.0.0.1:27017/database_name?ssl=false'
        placeholder_db_dir => '/opt/logstash-mongodb/'
        placeholder_db_name => 'xxx.db'
        collection => 'aaa'
        batch_size => 50
        type => 'test_aaa'
    }
    file {
      path => ["/home/test/logstash/data/*.csv"]
      start_position => "beginning"
      sincedb_path => "/home/test/logstash/db_dir/sincedb.log"
      type => "test_bbb"
    }
}
filter {
    if[type] == "test_aaa" {
        mutate {
                     ...
        }
                ...
    }
     
    if [type] == "test_bbb" {
        mutate {
                     ...
        }
                ...
    }
}

5.11 不同数据存入不同es bucket

对于多种数据,需要分别将其存入不同bucket的es中,采用不同的index,需要:

  1. 为每种数据打标
    例如上文中举例,为源数据打标engine字段值为test,为拆分后的数据设定enginetest_data
  2. output中为不同标签定义不同输出
    例如更改logstash配置中的output部分为:
output {
    stdout { codec => rubydebug }
    if [engine] == "test_data" {
      elasticsearch {
        hosts => ["https://127.0.0.1:9200"]
        index => "test_data"
        user => "test"
        password => "test"
        ssl => true
        ssl_certificate_verification => false
      }
    } else if [engine] == "test" {
      elasticsearch {
        hosts => ["https://127.0.0.1:9200"]
        index => "test"
        user => "test"
        password => "test"
        ssl => true
        ssl_certificate_verification => false
      } 
    }
}

参考链接

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,591评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,448评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,823评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,204评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,228评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,190评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,078评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,923评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,334评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,550评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,727评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,428评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,022评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,672评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,826评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,734评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,619评论 2 354

推荐阅读更多精彩内容