业务场景
最近在做CA卡相关业务处理,用户量700w,对于相关操作日志,按照传统的方式显然不合理的,出现问题排查也是个事,最后我考虑通过Logstash直接全写到es内,下面我就简单写一下我怎么弄的,当然还有很多别的可以实现:例如 Filebeat等。
安装部署
- 安装Logstash,之前通过docker安装过,这里我使用安装包的方式。
- 访问链接,下载文件
https://www.elastic.co/cn/downloads/past-releases
-
选择对应的产品及版本
- 解压
安装完成 ~~~
定制化配置
1.创建文件
/work/elasticsearch/logstash-7.1.0/config/logstash-omc.conf
2.日志打印例子
2020-01-14 17:13:18,419 DEBUG http-bio-8082-exec-1 - {"createAt":"2020-01-14 17:13:03","status":"0","developer":null,"servid":"760000","action":"UPDATE","devno":"keyno01","devtype":1,"modifyAt":"2020-01-14 17:13:03","accountid":"131022","servstatus":null,"uapuserid":"152022","serialno":"76a58c28cff741aba621a6098cebaf6c"}
3.logstash-omc.conf 编写配置文件例子
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
file{
#配置相关业务日志全部存入此文件
path=>["/work/docker/bandDev.log"]
type=>"apache-log"
start_position => "beginning"
ignore_older => 0
}
}
filter {
grok {
match => {
#正则表达式 根据日志打印格式编写
"message" => ["([\s\S]*) (?<Msg>({.*?}))"]
}
#覆盖之前的message
overwrite => ["message"]
#去除无用的字段
remove_field => ["host","beat","offset"]
}
json{
source => "Msg"
target => "xa"
}
}
output {
elasticsearch {
#es的连接地址
hosts => ["localhost:9200"]
#es中的索引名称
index => "xa-logstash"
#user => "elastic"
#password => "changeme"
}
启动Logstash
#进入bin目录下
e24-6:bin dailong$ pwd
/work/elasticsearch/logstash-7.1.0/bin
e24-6:bin dailong$ ./logstash -f /work/elasticsearch/logstash-7.1.0/config/logstash-omc.conf
Sending Logstash logs to /work/elasticsearch/logstash-7.1.0/logs which is now configured via log4j2.properties
[2020-01-16T16:30:18,257][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2020-01-16T16:30:18,285][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"7.1.0"}
[2020-01-16T16:30:27,887][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2020-01-16T16:30:28,175][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://localhost:9200/"}
[2020-01-16T16:30:28,250][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>7}
[2020-01-16T16:30:28,254][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
[2020-01-16T16:30:28,292][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]}
[2020-01-16T16:30:28,299][INFO ][logstash.outputs.elasticsearch] Using default mapping template
[2020-01-16T16:30:28,464][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"index_patterns"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s", "number_of_shards"=>1}, "mappings"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}
[2020-01-16T16:30:28,634][INFO ][logstash.javapipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>500, :thread=>"#<Thread:0x1234b969 run>"}
[2020-01-16T16:30:29,188][INFO ][logstash.inputs.file ] No sincedb_path set, generating one based on the "path" setting {:sincedb_path=>"/work/elasticsearch/logstash-7.1.0/data/plugins/inputs/file/.sincedb_20b96030a2ca04be6e9a04f4363503a2", :path=>["/work/docker/bandDev.log"]}
[2020-01-16T16:30:29,246][INFO ][logstash.javapipeline ] Pipeline started {"pipeline.id"=>"main"}
[2020-01-16T16:30:29,343][INFO ][filewatch.observingtail ] START, creating Discoverer, Watch with file and sincedb collections
[2020-01-16T16:30:29,352][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2020-01-16T16:30:29,992][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
logstash就是类似于咱的Listener,一个监听器,监听这个文件的变化,当变化了会根据咱之前写的那个配置文件的格式往es写数据。