filebeat.yml
filebeat.inputs:
# nginx_access 收集nginx访问日志
- type: log
id: nginx-access-id
enabled: true
backoff: "1s"
paths:
- /usr/local/nginx/logs/access.log
fields:
type: nginx-access # 这个type值在logstash里面有用!!!
fields_under_root: true
# nginx_error 收集nginx错误日志
- type: log
id: nginx-error-id
enabled: true
backoff: "1s"
paths:
- /usr/local/nginx/logs/error.log
multiline.pattern: '^\d{4}' # 多行合并到一起上报。遇到非数字开头的合并
multiline.negate: true
multiline.match: after
fields:
type: nginx-error # 这个type值在logstash里面有用!!!
fields_under_root: true
# laravel 项目日志
- type: log
id: piao-id
enabled: true
backoff: "1s"
paths:
- /home/www/laravel/storage/logs/*.log
multiline.pattern: '^\[\d{4}'
multiline.negate: true
multiline.match: after
fields:
type: piao # 这个type值在logstash里面有用!!!
fields_under_root: true
output.redis:
enabled: true
hosts: ["localhost:6379"] #配置redis的ip和端口
key: filebeat-redis
db: 10
datatype: list
对应的logstash-redis.conf
input {
redis {
host => "127.0.0.1"
port => 6379
key => "filebeat-redis" # 和上面filebeat.yml里面的key对应
data_type => "list"
db => 10 # 和上面filebeat.yml里面的db对应
}
}
filter{
if [type] == "nginx-access" { # 和上面filebeat.yml里面的type对应
grok {
match => { "message" => "%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:request_method} %{DATA:uri} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent_bytes} \"%{DATA:http_referrer}\" \"%{DATA:http_user_agent}\"" }
}
# 通过date插件,把nginx日志中的时间戳用作logstash的event时间戳
date {
match => [ "time_local", "dd/MMM/yyyy:HH:mm:ss Z" ]
}
} else if [type] == "nginx-error"{
grok {
match => { "message" => "(?<time_local>%{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY}[- ]%{TIME}) \[%{LOGLEVEL:log_level}\] %{POSINT:pid}#%{NUMBER}: %{GREEDYDATA:error_message}(?:, client: (?<clientip>%{IP}|%{HOSTNAME}))(?:, server: %{IPORHOST:server}?)?(?:, request: %{QS:request})?(?:, upstream: (?<upstream>\"%{URI}\"|%{QS}))?(?:, host: %{QS:request_host})?(?:, referrer: \"%{URI:http_referrer}\")?" }
}
# 通过date插件,把nginx日志中的时间戳用作logstash的event时间戳
date {
match => [ "time_local", "YYYY/MM/dd HH:mm:ss" ]
target => "@timestamp"
}
} else if [type] == "piao"{
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:time_local}" }
}
date {
match => [ "time_local", "YYYY-MM-dd HH:mm:ss" ]
target => "@timestamp"
}
}
mutate{
remove_field => ["ecs","cloud","@version","input", "time_local"]
remove_field => "[agent][version]"
remove_field => "[agent][ephemeral_id]"
remove_field => "[agent][id]"
remove_field => "[agent][type]"
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "%{[type]}-%{+YYYY.MM.dd}" # 动态生成索引
user => "logstash" #填写自己的账号密码
password => "your paswrod" #填写自己的账号密码
}
}