ELK 搭建
备注:Elastic(8.5.2)、Logstash(8.5.2) 、Kibana(8.5.2)、Zookeeper、Kafa(2.6.0)、Filebeat(8.5.2)
数据流转:
filebeat ——》 kafka+zookeeper ——》 logstash ——》 elasticsearch+kibana
Filebeat(日志采集):
filebeat的配置文件filebeat.yml
常用配置:
-
filebeat.inputs(用于定义输入数据的类型):
type: log 表示输入类型是log类型(默认值是log,可选值为stdin)
enabled: true 表示启用手动配置filebeat,而不是模块方式配置。
paths: 指定多个文件的path(需要监控的日志文件)
fields: 自定义的字段
name:设置一个名称区分日志是哪台机器的,默认为主机名称,习惯设为ip
output.kafka(用于定义输出到kafka):
-
processors: drop_fields: 不要的字段
官方配置文档:([Configure inputs | Filebeat Reference 8.6] | Elastic)
Logstash:
- Shipper:收集日志数据,负责监控本地日志文件的变化
- Broker:起数据缓冲的作用,用于连接多个shipper和多个indexer
- Indexer:
logstash的数据流:
input —> decode —> filter —> encode —> output
logstash的input:
用于数据的输入
logstash的filter:
用于数据的处理和转换等
logstash的output:
用于数据的输出
logstsh的decode、encode:
codec => plain: 无格式
codec => json: json格式
codec => json_lines: 多行的json格式
支持的数据类型:
- Boolean
- Number
- String
Logstash数据流中的数据被称为Event对象,Event是JSON结构。Event的属性称为字段。
如果需要引用这些字段,则[field名]
条件判断:
equality:==
、!=
、<
、>
、<=
、>=
regexp:=~
、!~
布尔:and
、or
、nand
、xor
inclusion:in
、not in
Input Plug 常用插件:
-
从文件监听读取
input{ file { path => ["/var/log/*.log", "/var/log/message"] type => "system" start_position => "beginning" } }
close_older:设置文件多长时间(s)没有变化就关闭对此文件的监听
codec:输入数据之后,对数据进行解码
delimiter:文件内容的 行分隔符,一般使用默认的。
discover_interval:间隔多长时间(s)检测路径下是否有新的文件。
exclude:指定排除的文件,如
exclude => .gz
id:插件唯一标识
path:要监听的日志文件
type:event的type类型
start_position:从文件开通还是结尾读取文件内容,默认是结尾。如果需要导入文件中的老数据,可以设置为'beginning'。这个参数只在第一次启动logstash时有效,如果文件异常存在于sincedb的记录内,则此配置无效。
stat_interval:间隔多次时间(s)检测文件是否被更新。设置太短会增加系统负荷。
tags:向event对象中添加自定义的标签,更方便后续处理使用
add_field:向event对象中添加自定义的字段
-
从beats监听读取:接收Filebeat或者其他beat发送的Events。
input { beats { port => 5044 } }
port:监听的端口
id:插件唯一标识
codec:输入数据之后,对数据进行解码
-
从TCP监听读取:TCP插件有两种工作模式,“Client”和“Server”,分别用于发送网络数据和监听网络数据。
input { tcp { port => 41414 } }
-
从redis监听读取:
input { redis { data_type => "list" #logstash redis插件工作方式 key => "logstash-test-list" #监听的键值 host => "127.0.0.1" #redis地址 port => 6379 #redis端口号 } }
Filter plugin常用过滤插件:
-
grok正则捕获:
grok 是Logstash中将非结构化数据解析成结构化数据以便于查询的最好工具,非常适合解析syslog logs,apache log, mysql log,以及一些其他的web log
预定义表达式调用
Logstash提供120个常用正则表达式可供安装使用,安装之后你可以通过名称调用它们,语法如下:%{SYNTAX:SEMANTIC}
SYNTAX:表示已经安装的正则表达式的名称
SEMANTIC:表示从Event中匹配到的内容的名称
例子1:Event的内容为“[debug] 127.0.0.1 - test log content”。
匹配%{IP:client}将获得“client: 127.0.0.1”的结果,前提安装了IP表达式。
如果你在捕获数据时想进行数据类型转换可以使用%{NUMBER:num:int}这种语法,默认情况下,所有的返回结果都是string类型,当前Logstash所支持的转换类型仅有“int”和“float”;
例子2:
日志文件http.log内容:55.3.244.1 GET /index.html 15824 0.043
表达式:%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}
filter { grok { match => {"message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"} } }
自定义表达式调用:
语法:(?<field_name>the pattern here)
举例:捕获10或11和长度的十六进制queue_id。
可以使用表达式(?<queue_id>[0-9A-F]{10,11})
步骤:
1、在Logstash根目录下创建文件夹“patterns”,在“patterns”文件夹中创建文件“extra”(文件名称无所谓,可自己选择有意义的文件名称);
2、在文件“extra”中添加表达式,格式:patternName regexp,名称与表达式之间用空格隔开即可;
3、使用自定义的表达式时需要指定“patterns_dir”变量,变量内容指向表达式文件所在的目录;
## Logstash配置 ##
filter {
grok {
patterns_dir => ["./patterns"]
match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }
}
}
date时间处理插件:该插件用于时间字段的格式转换
mutate数据修改插件:它提供了丰富的基础类型数据处理能力。可以重命名,删除,替换和修改事件中的字段。
-
JSON插件:用于解码JSON格式的字符串
## 事例配置,message是JSON格式的字符串:"{\"uid\":3081609001,\"type\":\"signal\"}" filter { json { source => "message" target => "jsoncontent" } }
elasticsearch查询过滤插件:
更多插件:Split、GeoIP、Ruby等等。
Output plugin常用过滤插件:
-
ElasticSearch输出插件:将数据输出到elasticsearch中:
以天为索引:
logstash-%{+yyyy.MM.dd}
以周为索引:
logstash-%{+xxxx.ww}
以月为索引:
logstash-%{+yyyy.MM}
output { elasticsearch { hosts => ["127.0.0.1:9200"] index => "filebeat-%{type}-%{+yyyy.MM.dd}" template_overwrite => true } }
-
Redis输出插件:将数据输出到redis中
output { redis { host => "127.0.0.1" port => 6379 data_type => "list" key => "logstash-list" } }
例子:
nginx access.log的表达式:
%{IPORHOST:client} - - \[%{HTTPDATE:timestamp}\] "(?:%{WORD:method} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)
nginx error.log的表达式:
%{DATESTAMP:request_time} \[%{LOGLEVEL:log_level}\] %{POSINT:pid}#%{NUMBER}: %{GREEDYDATA:error_message}(?:, client: (?<client>%{IP}|%{HOSTNAME}))(?:, server: %{IPORHOST:server})(?:, request: "%{WORD:method} %{NOTSPACE:request}( HTTP/%{NUMBER:http_version})")(?:, host: "%{HOSTNAME:host}")?(?:, referrer: "%{NOTSPACE:referrer}")?
springboot log的表达式:
%{TIMESTAMP_ISO8601:createTime}%{SPACE}%{LOGLEVEL:LEVEL}%{SPACE}%{INT:pid}%{SPACE}---%{SPACE}\[%{DATA:threadName}\]%{SPACE}%{JAVACLASS:javaClass}%{SPACE}:%{SPACE}%{GREEDYDATA:msg}
或者
(?m)%{TIMESTAMP_ISO8601:createTime}%{SPACE}%{LOGLEVEL:LEVEL}%{SPACE}%{INT:pid}%{SPACE}---%{SPACE}\[%{DATA:threadName}\]%{SPACE}%{JAVACLASS:javaClass}%{SPACE}:%{SPACE}%{GREEDYDATA:msg}
实战:
两台机器:一台数据(data)服务器、一台应用(application)服务器。
data机器 安装 logstash-8.5.2、elasticsearch-8.5.2、kibana-8.5.2,
application机器 安装 filebeat-8.5.2,
application机器:
-
配置filebeat文件:
#输入配置 filebeat.inputs: - type: log enabled: true #可以配置多个日志路径 paths: - /var/log/*.log #输出配置,输出到logstash output.logstash: hosts: ["127.0.0.1:5044"] #输出到控制台,适合测试 #output.console: # pretty: true processors: - include_fields: fields: ['message','log','fields']
启动filebeat:
./filebeat -e -c my_filebeat.yml
data机器:
调整jvm.options文件的: -xml、-xmx
-
配置logstash文件:
#输入来自beats input { beats { port => 5044 } } #过滤 filter { #消息分割处理 grok { #分割字符串的格式 match => %{IPORHOST:client} - - \[%{HTTPDATE:timestamp}\] "(?:%{WORD:method} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-) #删除不要的字段 remove_field => [] } #时间处理 date { #timestamp字段处理。把消息中的timestamp字段赋值给@timestamp match => ["timestamp", "dd/MMM/yyy:HH:mm:ss Z"] } # mutate { #删除不要的字段 remove_field => ["timestamp"] } } #输出到elastic中 output { #输出到es中 elasticsearch { hosts => ["127.0.0.1:9200"] #index => "filebeat-%{type}-%{+yyyy.MM.dd}" index => "filebeat-%{type}-%{+yyyy.MM}" template_overwrite => true } #输出到控制台 #stdout { # codec => rubydebug #} }
-
启动:
测试用:输入=控制台,输出=控制台
./logstash -e 'input{stdin{}} output{stdout{codec=>rubydebug}}'
指定配置文件启动: `./logstash -f my_logstash.conf`