ela+fiebeat+logstash 容器化 kibana web数据展示 基础
之前我们已经可以把kibana把日志展示出来了 现在我们可以增加一些新的功能 比如增加过滤条件,在上面这个文章所构建的环境的基础上。
基本做法就是
在日志挖掘的logstash的配置文件中添加filter条件
这里是在 `logstash/logstash_stdout.conf`中添加
使用Grok Filter Plugin
编辑解析Web日志
现在,您有一个工作流程,从Filebeat读取日志行。但是您会注意到日志消息的格式不理想。您要解析日志消息以从日志中创建特定的命名字段。为此,您将使用grok过滤器插件。
因为grok过滤器插件在传入的日志数据中查找模式,所以配置插件需要您决定如何识别用例感兴趣的模式。来自Web服务器日志示例的代表行如下所示:
信息 | 字段名 |
---|---|
IP Address | clientip |
User ID | ident |
User Authentication | auth |
timestamp | timestamp |
HTTP Verb | verb |
Request body | request |
HTTP Version | httpversion |
HTTP Status Code | response |
Bytes served | bytes |
Referrer URL | referrer |
User agent | agent |
使用方法,编辑 logstash/logstash_stdout.conf
写入下面的内容:
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
}
假如你在这之前已经运行了logstash
和 filebeat
。要想生效现在的过滤配置,您需要强制Filebeat
从头开始读取日志文件。
不必重新启动Logstash
来接收更改,但是需要删除 filebeat
下的注册表文件 registry
,此文件一般在安装目录下的 data
目录下。
由于Filebeat
存储在注册表中收集的每个文件的状态,因此删除注册表文件会强制Filebeat
读取从头开始捕获的所有文件。
接下来,使用以下命令重新启动Filebeat
即可
使用Geoip Filter
插件修改数据
除了解析日志数据以获得更好的搜索之外,过滤插件也可以从现有数据中导出补充信息。例如,geoip插件会查找IP地址,从地址中导出地理位置信息,并将该位置信息添加到日志中。
将Logstash
实例配置为使用geoip
过滤器插件,将以下行添加到文件的filter
部分
geoip {
source => "clientip"
}
完整的示例:
input {
beats {
port => 5044
host => "0.0.0.0"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}
}
output {
elasticsearch { -----输出到ela中
hosts => ["elasticsearch:9200"]
manage_template => false
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
# stdout { codec => rubydebug } # 假如有问题,可以打开此行进行调试
}
修改好保存,生效的话,同样先删除Filebeat
注册文件,之后重启 filebeat
kibana截图展示
测试您的管道编辑
现在,
Logstash
管道配置为将数据索引到Elasticsearch
集群中,您可以查询Elasticsearch
。
ela的三个表示对应关系 类似于mangodb
索引 ---- 一个数据库
类型 ---- 一个表
文档 ---- 一条数据
过滤nginx
的logstash/logstash_stdout.conf
input {
beats {
port => 5044
host => "0.0.0.0"
}
}
filter {
if ([fileset][module] == "nginx") {
if ([fileset][name] == "access") {
grok {
match => { "message" => ["%{IPORHOST:[nginx][access][remote_ip]} - %{DATA:[nginx][access][user_name]} \[%{HTTPDATE:[nginx][access][time]}\] \"%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}\" %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} \"%{DATA:[nginx][access][referrer]}\" \"%{DATA:[nginx][access][agent]}\""] }
remove_field => "message"
}
mutate {
add_field => { "read_timestamp" => "%{@timestamp}" }
}
date {
match => [ "[nginx][access][time]", "dd/MMM/YYYY:H:m:s Z" ]
remove_field => "[nginx][access][time]"
}
useragent {
source => "[nginx][access][agent]"
target => "[nginx][access][user_agent]"
remove_field => "[nginx][access][agent]"
}
geoip {
source => "[nginx][access][remote_ip]"
target => "[nginx][access][geoip]"
}
}
else if [fileset][name] == "error" {
grok {
match => { "message" => ["%{DATA:[nginx][error][time]} \[%{DATA:[nginx][error][level]}\] %{NUMBER:[nginx][error][pid]}#%{NUMBER:[nginx][error][tid]}: (\*%{NUMBER:[nginx][error][connection_id]} )?%{GREEDYDATA:[nginx][error][message]}"] }
remove_field => "message"
}
mutate {
rename => { "@timestamp" => "read_timestamp" }
}
date {
match => [ "[nginx][error][time]", "YYYY/MM/dd H:m:s" ]
remove_field => "[nginx][error][time]"
}
}
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
manage_template => false
index => "ddd---%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
# stdout { codec => rubydebug } # 假如有问题,可以打开此行进行调试
}