logstash 7.x
一、下载软件
二、安装部署
2.1. 准备patterns文件
2.2. 创建配置文件
2.3. 后台运行
一、下载软件
https://www.elastic.co/cn/downloads/logstash
二、安装部署
2.1. 准备patterns文件
到logstash的目录下,创建patterns目录
mkdir patterns
在patterns目录里新建一个java的patterns文件
内容如下:
user-center
MYAPPNAME ([0-9a-zA-Z_-]*)
RMI TCP Connection(2)-127.0.0.1
MYTHREADNAME ([0-9a-zA-Z._-]|(|)|\s)*
就是一个名字叫做java的文件,不需要文件后缀
2.2. 创建配置文件
到logstash的config目录下创建logstash.conf文件
需要修改以下地方
filter 块中的patterns_dir路径
output 块中的密码
vim logstash.conf
input {
beats {
port => 5044
}
}
filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
if [fields][docType] == "sys-log" {
grok {
patterns_dir => ["/opt/logstash-7.6.1/patterns"]
match => { "message" => "[%{NOTSPACE:appName}:%{NOTSPACE:serverIp}:%{NOTSPACE:serverPort}] %{TIMESTAMP_ISO8601:logTime} %{LOGLEVEL:logLevel} %{WORD:pid} [%{MYAPPNAME:traceId}] [%{MYTHREADNAME:threadName}] %{NOTSPACE:classname} %{GREEDYDATA:message}" }
overwrite => ["message"]
}
date {
match => ["logTime","yyyy-MM-dd HH:mm:ss.SSS Z"]
}
date {
match => ["logTime","yyyy-MM-dd HH:mm:ss.SSS"]
target => "timestamp"
locale => "en"
timezone => "+08:00"
}
mutate {
remove_field => "logTime"
remove_field => "@version"
remove_field => "host"
remove_field => "offset"
}
if [fields][docType] == "point-log" {
grok {
patterns_dir => ["/opt/logstash-7.6.1/patterns"]
match => {
"message" => "%{TIMESTAMP_ISO8601:logTime}|%{MYAPPNAME:appName}|%{WORD:resouceid}|%{MYAPPNAME:type}|%{GREEDYDATA:object}"
}
}
kv {
source => "object"
field_split => "&"
value_split => "="
}
date {
match => ["logTime","yyyy-MM-dd HH:mm:ss.SSS Z"]
}
date {
match => ["logTime","yyyy-MM-dd HH:mm:ss.SSS"]
target => "timestamp"
locale => "en"
timezone => "+08:00"
}
mutate {
remove_field => "message"
remove_field => "logTime"
remove_field => "@version"
remove_field => "host"
remove_field => "offset"
}
}
if [fields][docType] == "mysqlslowlogs" {
grok {
match => [
"message", "#\s+User@Host:\s+%{USER:user}[[]]+]\s+@\s+(?:(?<clienthost>\S) )?[(?:%{IP:clientip})?]\s+Id:\s+%{NUMBER:id}\n# Query_time: %{NUMBER:query_time}\s+Lock_time: %{NUMBER:lock_time}\s+Rows_sent: %{NUMBER:rows_sent}\s+Rows_examined: %{NUMBER:rows_examined}\nuse\s(?<dbname>\w+);\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query_str>[\s\S])",
"message", "#\s+User@Host:\s+%{USER:user}[[]]+]\s+@\s+(?:(?<clienthost>\S) )?[(?:%{IP:clientip})?]\s+Id:\s+%{NUMBER:id}\n# Query_time: %{NUMBER:query_time}\s+Lock_time: %{NUMBER:lock_time}\s+Rows_sent: %{NUMBER:rows_sent}\s+Rows_examined: %{NUMBER:rows_examined}\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query_str>[\s\S])",
"message", "#\s+User@Host:\s+%{USER:user}[[]]+]\s+@\s+(?:(?<clienthost>\S) )?[(?:%{IP:clientip})?]\n# Query_time: %{NUMBER:query_time}\s+Lock_time: %{NUMBER:lock_time}\s+Rows_sent: %{NUMBER:rows_sent}\s+Rows_examined: %{NUMBER:rows_examined}\nuse\s(?<dbname>\w+);\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query_str>[\s\S])",
"message", "#\s+User@Host:\s+%{USER:user}[[]]+]\s+@\s+(?:(?<clienthost>\S) )?[(?:%{IP:clientip})?]\n# Query_time: %{NUMBER:query_time}\s+Lock_time: %{NUMBER:lock_time}\s+Rows_sent: %{NUMBER:rows_sent}\s+Rows_examined: %{NUMBER:rows_examined}\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query_str>[\s\S])"
]
}
date {
match => ["timestamp_mysql","yyyy-MM-dd HH:mm:ss.SSS","UNIX"]
}
date {
match => ["timestamp_mysql","yyyy-MM-dd HH:mm:ss.SSS","UNIX"]
target => "timestamp"
}
mutate {
convert => ["query_time", "float"]
convert => ["lock_time", "float"]
convert => ["rows_sent", "integer"]
convert => ["rows_examined", "integer"]
remove_field => "message"
remove_field => "timestamp_mysql"
remove_field => "@version"
}
}
}
output {
if [fields][docType] == "sys-log" {
elasticsearch {
hosts => ["http://localhost:9200"]
user => "elastic"
password => "qEnNfKNujqNrOPD9q5kb"
index => "sys-log-%{+YYYY.MM.dd}"
}
}
if [fields][docType] == "point-log" {
elasticsearch {
hosts => ["http://localhost:9200"]
user => "elastic"
password => "qEnNfKNujqNrOPD9q5kb"
index => "point-log-%{+YYYY.MM.dd}"
routing => "%{type}"
}
}
if [fields][docType] == "mysqlslowlogs" {
elasticsearch {
hosts => ["http://localhost:9200"]
user => "elastic"
password => "qEnNfKNujqNrOPD9q5kb"
index => "mysql-slowlog-%{+YYYY.MM.dd}"
}
}
}
复制
2.3. 后台运行
nohup bin/logstash -f config/logstash.conf &