1 简介
这篇文章介绍将nginx日志导入elasticsearch,方便之后进行可视化分析。关于elasticsearch和kibana的安装可以参考我的elasticsearch基础。
2 filebeat + logstash
filebeat是安装在客户端用来采集日志文件的轻量软件,logstash的功能是对filebeat传来的日志进行处理之后存入elasticsearch。也可以直接把logstash安装在客户端采集日志,但是logstash没有filebeat轻量。
2.1 filebeat和logstash安装
#filebeat下载地址https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.6.2-x86_64.rpm
rpm -ivh filebeat-8.6.2-x86_64.rpm
#logstash下载地址https://artifacts.elastic.co/downloads/logstash/logstash-8.6.2-linux-x86_64.tar.gz
tar -zxvf logstash-8.6.2-linux-x86_64.tar.gz
2.2 filebeat和logstash测试
#part 1 单独测试logstash
cd logstash-8.6.2
bin/logstash -e 'input { stdin { } } output { stdout {} }'
#上面表示logstash的输入(input)是标准输入(stdin{}),输出(output)是
#标准输出(stdout);直接键盘输入test,logstash会在控制台是打印test
#logstash启动需要一点时间,等就绪后测试
#part 2 filebeat输出到logstash测试
#创建logstash配置文件logstash-8.6.2/config/logstash-test.conf
#输入时tcp端口5044,输出是标准输出
[root@oneNginx1 config]# cat logstash-test.conf
input {
beats{
port => 5044
}
}
output {
stdout {
codec => rubydebug
}
}
#启动logstash
bin/logstash -f config/logstash-test.conf
#备份/etc/filebeat/filebeat.yml,创建新的filebeat.yml
[root@oneNginx1 filebeat]# cat filebeat.yml
filebeat.inputs:
- type: filestream
id: test
enabled: true
paths:
- /tmp/access.log
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
setup.template.settings:
index.number_of_shards: 1
output.logstash:
hosts: ["18.1.97.234:5044"]
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
#启动filebeat
systemctl start filebeat
#测试
echo "test" >> /tmp/access.log
结论:
- filebeat传给logstash的message字段存放了原日志文本
- filebeat给日志增加了很多客户端服务器信息相关的字段
2.3 logstash解析nginx日志
2.3.1 grok调试
logstash的配置可以分为3大块:input,filter,output。grok是filter的一个常用正则模块,功能是通过正则表达式将日志分割成多个字段,下面是使用Kibana调试使用grok结构化nginx日志。
图片1.png
grok中内置了一些预定义的正则表达式用来匹配常见的数据类型,如IPORHOST匹配ip,WORD匹配单个单词,HTTPDATE匹配时间。grok的规则表达式就是%{pattern:varname},将通过正则匹配的字符串存入变量名。可以使用patter_dir添加预定义patter文件。另外一种表达式是(?<varname>pattern)。
2.3.2 存储日志到elasticsearch
#配置logstash
[root@informixtest config]# cat logstash-test2.conf
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
beats {
port => 5044
}
}
filter {
grok {
match => { "message" => "%{IPORHOST:clientip} - - \[%{HTTPDATE:htptime}\] \"%{WORD:htpmethod} %{URIPATHPARAM:htpurl} HTTP/%{NUMBER:htpver}\" %{NUMBER:htpstatus} %{NUMBER:htpbytes} \"%{DATA:htpref}\" \"%{DATA:htpagent}\"" }
remove_field => ["message"]
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => ["http://localhost:9200"]
index => "test"
user => "elastic"
password => "rooT96588"
}
}
#上面的配置文件中,使用grok将filebeat传来的日志的message字段进行
#结构化,然后删除message
#将日志同时传输给elasticsearch和标准输出,不需要调试的时候可以删除stdout
#注上面配置文件中index表示把日志放入test索引下面
bin/logstash -f config/logstash-test2.conf
echo '18.1.100.111 - - [31/Mar/2023:15:23:21 +0800] "GET /hnb_ngx_status.txt HTTP/1.1" 200 114 "-" "python-requests/2.25.0"' >> /tmp/access.log
查询数据是否存入elasticsearch
存入elasticsearch.png