logstash grok 自测实例+常用正则
https://blog.csdn.net/qq_34646817/article/details/81232121#t1
grokdebug
Grok Debugger本地安装配置
https://blog.csdn.net/u013274150/article/details/82415818
https://www.cnblogs.com/kevin-yuan/archive/2012/11/11/2765340.html
提取字符串 da12bka3434bdca4343bdca234bm中包含在字符a和b之间的数字,但是这个a之前的字符不能是c;b后面的字符必须是d才能提取。
显然,这里就只有3434这个数字满足要求。那么我们怎么提取呢?
首先,我们写出含有捕获组的正则表达式:[^c]a\d*bd
然后我们再将其变为非捕获组的正则表达式:(?<=[^c]a)\d*(?=bd)
基本语法
=~ 匹配正则
!~ 不匹配正则
in ,not in
与and 或 or
非与nand 非或xor
#字段引用
${[name][status]}
插件 plugin
yum install -y gem
gem sources --add https://gems.ruby-china.com/ --remove https://rubygems.org/
gem sources -l
bin/plugin install|update|uninstall logstash-input-jdbc
#(其实就在 vendor/bundle/jruby/1.9/gems/ 目录下)
bin/plugin list
#本地插件安装 执行成功以后 Gemfile 文件最后会多出一段内容
bin/logstash-plugin install /path/to/logstash-filter-crash.gem
bin/plugin install logstash-input-log4j2
RubyGems 镜像
https://gems.ruby-china.com/
测试
#开启测试模式运行
/usr/share/logstash/bin/logstash --path.settings /etc/logstash/ -f /etc/logstash/conf.d/logstash-simple.conf --debug | tee dubug.log
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logback-es.conf
less file
nl file | grep key
# 格式化文本
echo -ne '888888888888'
cat randdata | awk '{print $2}' | sort | uniq -c | tee sortdata
top -h|H
#替换文字
sed 's/upstreamtime":-/upstreamtime":0/'
Rsyslog
#导入数据到logstash
nc 127.0.0.1 8888 < olddata
配置
通常你要导入原有数据进 Elasticsearch 的话,你还需要 filter/date 插件来修改默认的"@timestamp" 字段值
https://elkguide.elasticsearch.cn/logstash/plugins/filter/dissect.html
bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
---
input {
stdin { }
}
output {
stdout { }
}
input {stdin{}}
output {stdout{codec => rubydebug}}
---
input {
stdin {
add_field => {"key" => "value"}
codec => "plain"
tags => ["add"]
type => "std"
}
}
---
input {
file {
path => ["/var/log/**/*.log", "/var/log/message"] #绝对路径
type => "system"
start_position => "beginning"
sincedb_path => /dev/null #每次重启自动从头开始读
}
}
---
input {
stdin {
codec => multiline {
pattern => "^\["
negate => true
what => "previous"
}
}
}
---
input {
tcp {
port => 8888
mode => "server"
ssl_enable => false
}
}
---
#nginx.conf
logformat json '{"@timestamp":"$time_iso8601",'
'"@version":"1",'
'"host":"$server_addr",'
'"client":"$remote_addr",'
'"size":$body_bytes_sent,'
'"responsetime":$request_time,'
'"domain":"$host",'
'"url":"$uri",'
'"status":"$status"}';
access_log /var/log/nginx/access.log_json json;
# logstash.conf
input {
file {
path => "/var/log/nginx/access.log_json"
codec => "json"
}
}
---
filter {
grok {
match => ["message", "%{HTTPDATE:logdate}"]
}
date {
match => ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"]
}
}
---
input {stdin{}}
filter {
grok {
match => {
# grok 表达式统一写入到一个地方 用 filter/grok 的 patterns_dir 选项来指明
patterns_dir => ["/path/to/your/own/patterns"]
"message" => "\s+(?<request_time>\d+(?:\.\d+)?)\s+" #(?P<name>pattern)
"message" => "(?m)\s+(?<request_time>\d+(?:\.\d+)?)\s+" # (?m) 标记匹配回车换行
"message" => "%{SYSLOGBASE} %{DATA:message}"
#[正则表达式(?pgroup)](https://blog.csdn.net/lc574260570/article/details/82701295)
#/path/to/your/own/patterns eg:/etc/logstash/patterns.d/
# # common postfix patterns
#POSTFIX_QUEUEID ([0-9A-F]{6,}|[0-9a-zA-Z]{15,}|NOQUEUE)
# helper patterns
#GREEDYDATA_NO_COLON [^:]*
(?<date>\d{2}/\d{2}/\d{2})\s(?<time>\d{2}):\d{2}:\d{2}),\d{3})\s(?<message>.+)
}
match => [
"message", "(?<request_time>\d+(?:\.\d+)?)",
"message", "%{SYSLOGBASE} %{DATA:message}",
"message", "(?m)%{WORD}"
]
remove_field => ["message"] #删除掉 message 字段
overwrite => ["message"] #重写默认的 message 字段
}
}
output {stdout{codec => rubydebug}}
{
"message" => "begin 123.456 end",
"@version" => "1",
"@timestamp" => "2014-08-09T11:55:38.186Z",
"host" => "raochenlindeMacBook-Air.local",
"request_time" => "123.456"
}
---
filter {
grok {
match => {
#%{PATTERN_NAME:capture_name:data_type} data_type 目前只支持两个值:int 和 float
"message" => "%{WORD} %{NUMBER:request_time:float} %{WORD}"
}
}
}
---
filter {
dissect {
mapping => {
"message" => "%{ts} %{+ts} %{+ts} %{src} %{} %{prog}[%{pid}]: %{msg}"
#http://%{domain}/%{?url}?%{?arg1}=%{&arg1}
}
convert_datatype => {
pid => "int"
}
}
}
#http://rizhiyi.com/index.do?id=123 http://%{domain}/%{?url}?%{?arg1}=%{&arg1}
---
output {
elasticsearch {
hosts => ["192.168.0.2:9200"]
index => "logstash-%{type}-%{+YYYY.MM.dd}" #索引名中不能有大写字母。 以 + 号开头的,就会自动认为后面是时间格式,尝试用时间格式来解析后续字符串
document_type => "%{type}"
flush_size => 20000 #攒到 20000 条数据一次性发送出去
idle_flush_time => 10 #如果 10 秒钟内也没攒够 20000 条,Logstash 还是会以当前攒到的数据量发一次
sniffing => true
template_overwrite => true
}
}
------Log4J
<appender name="LOGSTASH" class="org.apache.log4j.net.SocketAppender">
<param name="RemoteHost" value="logstash_hostname" />
<param name="ReconnectionDelay" value="60000" />
<param name="LocationInfo" value="true" />
<param name="Threshold" value="DEBUG" />
</appender>
<root>
<level value="INFO"/>
<appender-ref ref="OTHERPLACE"/>
<appender-ref ref="LOGSTASH"/>
</root>
---
log4j.rootLogger=DEBUG, logstash
###SocketAppender###
log4j.appender.logstash=org.apache.log4j.net.SocketAppender
log4j.appender.logstash.Port=4560
log4j.appender.logstash.RemoteHost=logstash_hostname
log4j.appender.logstash.ReconnectionDelay=60000
log4j.appender.logstash.LocationInfo=true
---
log4j.rootLogger=INFO,console
# for package com.demo.elk, log would be sent to socket appender.
log4j.logger.com.demo.elk=DEBUG, socket
# appender socket
log4j.appender.socket=org.apache.log4j.net.SocketAppender
log4j.appender.socket.Port=4567
log4j.appender.socket.RemoteHost=centos2
log4j.appender.socket.layout=org.apache.log4j.PatternLayout
log4j.appender.socket.layout.ConversionPattern=%d [%-5p] [%l] %m%n
log4j.appender.socket.ReconnectionDelay=10000
# appender console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d [%-5p] [%l] %m%n
----
input {
log4j {
type => "log4j-json"
port => 4560
}
}
------------------
[https://www.cnblogs.com/linjiqin/p/10757261.html](https://www.cnblogs.com/linjiqin/p/10757261.html)
[https://segmentfault.com/a/1190000016192394?utm_source=tag-newest](https://segmentfault.com/a/1190000016192394?utm_source=tag-newest)
[https://blog.csdn.net/BianChengNinHao/article/details/83503295](https://blog.csdn.net/BianChengNinHao/article/details/83503295)
[https://www.cnblogs.com/xing901022/p/4830684.html](https://www.cnblogs.com/xing901022/p/4830684.html)
[高版本logstash收集log4j日志](https://blog.csdn.net/haozhuxuan/article/details/79738447)
[Slf4j与log4j及log4j2的关系及使用方法](https://blog.csdn.net/Andrew_Yuan/article/details/83010938)
[springmvc使用log4j2的配置](https://blog.csdn.net/smallbabylong/article/details/83475308)
[log4j:configuration](https://blog.csdn.net/jeikerxiao/article/details/78092265)
[Log4J日志整合及配置详解](https://www.cnblogs.com/wangzhuxing/p/7753420.html)
[log4j正则匹配 logstash 的 grok-patterns ](https://blog.csdn.net/qq_28364999/article/details/82945024)
[https://www.cnblogs.com/Orgliny/p/5592186.html](https://www.cnblogs.com/Orgliny/p/5592186.html)
[filebeat + logstash 对message提取指定字段](https://blog.csdn.net/weixin_33901926/article/details/87495298)
[设置日志输出编码utf8](https://jiangzhengjun.iteye.com/blog/526364)
[XML中必须进行转义的字符](https://blog.csdn.net/chenlycly/article/details/51314686)
[log4j添加自定义Layout类转成想要的json格式](https://blog.csdn.net/lnkToKing/article/details/79563460)
#log4j2.xml
# [log4j的org.apache.log4j.PatternLayout](https://www.cnblogs.com/luoxuan3/p/4200711.html)
{"time":"%d{yyyy-MM-dd HH:mm:ss,SSS}","logtype":"%p","loginfo":"%c:%m"}%n
<Properties>
<property name="LOG_PATTERN">%d{yyyy-MM-dd HH:mm:ss.SSS} -%5p ${PID:-} [%15.15t] %-30.30C{1.} : %m%n</property>
</Properties>
<Properties>
<Property name="LOG_PATTERN">{"logger": "%logger", "level": "%level", "msg": "%message"}%n</Property>
</Properties>
<!-- 47.*.*.159为logstash主机外网IP,4560为logstash端口 -->
<Socket name="logstash-tcp" host="47.*.*.159" port="4560" protocol="TCP">
<JsonLayout compact="true" eventEol="true" />
<PatternLayout pattern="${LOG_PATTERN}" />
<PatternLayout charset="UTF-8" pattern="${log_pattern}"/>
</Socket>
# logstash.conf
input {
tcp {
port => 4560
codec => json
}
}
---
config/log4j2-tcp.conf
input {
tcp {
mode => "server"
host => "127.0.0.1"
port => 4567
}
}
filter {
json {
source => "message"
}
}
output {
stdout {
codec => rubydebug
}
}
------------
<Configuration>
<Appenders>
<Socket name="Socket" host="localhost" port="12345">
<JsonLayout compact="true" eventEol="true" />
</Socket>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Socket"/>
</Root>
</Loggers>
</Configuration>
---
input {
tcp {
host => "192.168.0.153"
port => 4567
#codec => plain { charset => "GB2312" }
codec => json
}
}
filter {
json {
source => "message"
add_field => ["type", "%{dtype}"]
remove_field => [ "server", "server.fqdn", "timestamp" ]
}
}
output {
if "_jsonparsefailure" not in [tags] {
stdout { codec => rubydebug }
elasticsearch {
hosts => "192.168.2.181:9200"
}
}
}
可用的配置
#logstash.conf
# For detail structure of this file
# Set: https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html
input {stdin{}}
input {
# For detail config for log4j as input,
# See: https://www.elastic.co/guide/en/logstash/current/plugins-inputs-log4j.html
tcp {
mode => "server"
host => "192.168.1.55"
port => 9250
codec => plain { charset => "UTF-8" }
codec => json_lines
}
}
filter {
#Only matched data are send to output. fasfsf 12.23 sdfsa
grok {
match => {
"message" => "(?<msg>.+)\s+(?<request_time>\d+(?:\.\d+)?)\s+(?<message>.+)"
}
}
}
output {
stdout { codec => rubydebug }
# For detail config for elasticsearch as output,
# See: https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html
elasticsearch {
action => "index" #The operation on ES
hosts => "192.168.1.55:9200" #ElasticSearch host, can be array.
index => "consumer-%{appname}-%{+YYYY.MM.dd}" #The index to write data to.
}
}
------
#logback.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include resource="org/springframework/boot/logging/logback/defaults.xml"/>
<springProperty scope="context" name="springAppName" source="spring.application.name"/>
<!-- 日志在工程中的输出位置 -->
<property name="LOG_FILE" value="${BUILD_FOLDER:-build}/${springAppName}"/>
<!-- 控制台的日志输出样式 -->
<property name="CONSOLE_LOG_PATTERN"
value="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr([${springAppName:-},%X{X-B3-TraceId:-},%X{X-B3-SpanId:-},%X{X-Span-Export:-}]){yellow} %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>
<!-- 控制台Appender -->
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<encoder>
<pattern>${CONSOLE_LOG_PATTERN}</pattern>
<charset>utf8</charset>
</encoder>
<!--<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss} %contextName %-5level %logger{50} -%msg%n</pattern>
</encoder>-->
</appender>
<!--<appender name="logstash" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>127.0.0.1:9250</destination>
</appender>-->
<!-- 为logstash输出的json格式的Appender -->
<appender name="logstash_file" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_FILE}.json</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_FILE}.json.%d{yyyy-MM-dd}.gz</fileNamePattern>
<maxHistory>7</maxHistory>
</rollingPolicy>
<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<timestamp>
<timeZone>UTC</timeZone>
</timestamp>
<pattern>
<pattern>
{
"severity": "%level",
"trace": "%X{X-B3-TraceId:-}",
"pid": "${PID:-}",
"thread": "%thread",
"class": "%logger",
"rest": "%message"
}
</pattern>
</pattern>
<logstashMarkers/>
<stackTrace>
<throwableConverter class="net.logstash.logback.stacktrace.ShortenedThrowableConverter">
<maxDepthPerThrowable>30</maxDepthPerThrowable>
<maxLength>4096</maxLength>
<shortenedClassNameLength>20</shortenedClassNameLength>
<rootCauseFirst>true</rootCauseFirst>
</throwableConverter>
</stackTrace>
</providers>
</encoder>
</appender>
<appender name="logstash"
class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>192.168.1.55:9250</destination>
<!-- encoder必须配置,有多种可选 -->
<encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder" >
<!-- "appname":"yang_test" 的作用是指定创建索引的名字时用,并且在生成的文档中会多了这个字段 -->
<customFields>{"appname":"springcloud_consume"}</customFields>
</encoder>
<!-- 日志输出编码 -->
<!--<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<timestamp>
<timeZone>UTC</timeZone>
</timestamp>
<pattern>
<pattern>
{
"severity": "%level",
"service": "${springAppName:-}",
"trace": "%X{X-B3-TraceId:-}",
"span": "%X{X-B3-SpanId:-}",
"exportable": "%X{X-Span-Export:-}",
"pid": "${PID:-}",
"thread": "%thread",
"class": "%logger{40}",
"rest": "%message",
"appname":"springcloud_consume"
}
</pattern>
</pattern>
</providers>
</encoder>-->
</appender>
<root level="INFO">
<appender-ref ref="console"/>
<appender-ref ref="logstash_file"/>
<appender-ref ref="logstash"/>
</root>
</configuration>
----
<appender name="socketAppender" class="org.apache.log4j.net.SocketAppender">
<param name="remoteHost" value="192.168.1.55" /><!-- 远程主机地址 -->
<param name="port" value="9250" />
<param name="Threshold" value="DEBUG" />
<param name="ReconnectionDelay" value="60000" />
<param name="LocationInfo" value="true" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d{HH:mm:ss.SSS},[%c,%l] %m%n" />
</layout>
</appender>
---
log4j.rootCategory=INFO,Logstash
# Logstash appender
log4j.appender.Logstash=org.apache.log4j.net.SocketAppender
log4j.appender.Logstash.RemoteHost=192.168.1.55
log4j.appender.Logstash.port=9250
log4j.appender.Logstash.Threshold=INFO
log4j.appender.Logstash.ReconnectionDelay=60000
log4j.appender.Logstash.LocationInfo=true
---
### log output control D is debug log output is or not ,E is ERROR OUTPUT is or not control by have D ,E manual control
log4j.rootLogger = error,stdout,D,E
### console logs ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
### debug log ###
log4j.appender.D = org.apache.log4j.DailyRollingFileAppender
log4j.appender.D.File = ./logs/debug/debug.log
log4j.appender.D.Append = true
log4j.appender.D.Threshold = DEBUG
log4j.appender.D.layout = org.apache.log4j.PatternLayout
log4j.appender.D.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
###error logs ###
log4j.appender.E = org.apache.log4j.DailyRollingFileAppender
log4j.appender.E.File = ./logs/error/error.log
log4j.appender.E.Append = true
log4j.appender.E.Threshold = ERROR
log4j.appender.E.layout = org.apache.log4j.PatternLayout
log4j.appender.E.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
filebeats
# 配置文件路径 "/etc/filebeat/filebeat.yml"
# 一个配置文件可以包含多个prospectors,一个prospectors可以包含多个path。
filebeat:
spool_size: 1024 # 最大可以攒够 1024 条数据一起发送出去
idle_timeout: "5s" # 否则每 5 秒钟也得发送一次
registry_file: ".filebeat" # 文件读取位置记录文件,会放在当前工作目录下。所以如果你换一个工作目录执行 filebeat 会导致重复传输!
# Additional prospector
registry_file: /var/lib/filebeat/registry
config_dir: "path/to/configs/contains/many/yaml" # 如果配置过长,可以通过目录加载方式拆分配置
# List of prospectors to fetch data.
prospectors:
# Each - is a prospector. Below are the prospector specific configurations
-
fields: ownfield: "mac" # 类似 logstash 的 add_fields
ignore_older: "24h" # 超过 24 小时没更新内容的文件不再监听。在 windows 上另外有一个配置叫 force_close_files,只要文件名一变化立刻关闭文件句柄,保证文件可以被删除,缺陷是可能会有日志还没读完
scan_frequency: "10s" # 每 10 秒钟扫描一次目录,更新通配符匹配上的文件列表
tail_files: false # 是否从文件末尾开始读取
encoding: "utf-8"
harvester_buffer_size: 16384 # 实际读取文件时,每次读取 16384 字节
backoff: "1s" # 每 1 秒检测一次文件是否有新的一行内容需要读取
paths:
- /var/log/messages # 指明读取文件的位置
- "/var/log/apache/*" # 可以使用通配符
- /var/log/wifi.log
exclude_files: ["/var/log/apache/error.log"]
input_type: log # 除了 "log",还有 "stdin"
document_type: messages # 定义写入 ES 时的 _type 值
include_lines: ["^ERR", "^WARN"] # 只发送包含这些字样的日志
exclude_lines: ["^OK"] # 不发送包含这些字样的日志
multiline: # 多行合并
pattern: '^[[:space:]]'
negate: false
match: after
-
paths:
- /alidata/log/nginx/access/access.log.json
input_type: log
document_type: nginxacclog
############################# Libbeat Config ##################################
# Base config file used by all other beats for using libbeat features
############################# Output ##########################################
# 输出数据到 redis
output:
redis:
host: "10.122.52.129"
port: 6379
password: "123456"
# 输出数据到 logstash ,一般两者选用其一
logstash:
hosts: ["10.160.8.221:5044"]
############################# Shipper #########################################
shipper:
# 打上服务器tag
name: "host_2"
############################# Logging #########################################
logging:
files:
rotateeverybytes: 10485760 # = 10MB
------------------
先由filebeat收集系统日志,收集后再发送给logstash处理,logstash可单独部署在一台服务器上用于接受处理filebeat发送过来的日志。Filebeat需要配置为将日志发送给logstash,filebeat的配置为(其他不发送的需注释掉):
]# sed -n 91,94p /etc/filebeat/filebeat.yml
#-------------------Logstash output----------------------
output.logstash:
# The Logstash hosts
hosts: ["10.0.0.13:5044"]
Logstash接受filebeat发送来的日志处理完成后再发送给elasticsearch,logstash的配置为:
#] cat /etc/logstash/conf.d/test.conf
input {
beats {
host => '0.0.0.0'
port => 5044
}
}
DATE1 [0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}
LEVEL (INFO)|(DEBUG)|(ERROR)|(WARN)|(FATAL)
JAVA_SOURCE [a-zA-Z.<>():0-9]*
JAVASOURCE (?:[a-zA-Z.,<>():0-9]*)
USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME}
INT (?:[+-]?(?:[0-9]+))
BASE10NUM (?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+)))
NUMBER (?:%{BASE10NUM})
BASE16NUM (?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+))
BASE16FLOAT \b(?<![0-9A-Fa-f.])(?:[+-]?(?:0x)?(?:(?:[0-9A-Fa-f]+(?:\.[0-9A-Fa-f]*)?)|(?:\.[0-9A-Fa-f]+)))\b
POSINT \b(?:[1-9][0-9]*)\b
NONNEGINT \b(?:[0-9]+)\b
WORD \b\w+\b
NOTSPACE \S+
SPACE \s*
DATA .*?
GREEDYDATA .*
QUOTEDSTRING (?>(?<!\\)(?>"(?>\\.|[^\\"]+)+"|""|(?>'(?>\\.|[^\\']+)+')|''|(?>`(?>\\.|[^\\`]+)+`)|``))
UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12}
# Networking
MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC})
CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4})
WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})
COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2})
IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?
IPV4 (?<[图片上传失败...(image-8f9d4d-1559555530871)][.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2}))(?![0-9])
IP (?:%{IPV6}|%{IPV4})
HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b)
HOST %{HOSTNAME}
IPORHOST (?:%{HOSTNAME}|%{IP})
HOSTPORT %{IPORHOST}:%{POSINT}
# paths
PATH (?:%{UNIXPATH}|%{WINPATH})
UNIXPATH (?>/(?>[\w_%!$@:.,-]+|\\.)*)+
TTY (?:/dev/(pts|tty([pq])?)(\w+)?/?(?:[0-9]+))
WINPATH (?>[A-Za-z]+:|\\)(?:\\[^\\?*]*)+
URIPROTO [A-Za-z]+(\+[A-Za-z+]+)?
URIHOST %{IPORHOST}(?::%{POSINT:port})?
# uripath comes loosely from RFC1738, but mostly from what Firefox
# doesn't turn into %XX
URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%_\-]*)+
#URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)?
URIPARAM \?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]]*
URIPATHPARAM %{URIPATH}(?:%{URIPARAM})?
URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?
# Months: January, Feb, 3, 03, 12, December
MONTH \b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\b
MONTHNUM (?:0?[1-9]|1[0-2])
MONTHNUM2 (?:0[1-9]|1[0-2])
MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])
# Days: Monday, Tue, Thu, etc...
DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)
# Years?
YEAR (?>\d\d){1,2}
HOUR (?:2[0123]|[01]?[0-9])
MINUTE (?:[0-5][0-9])
# '60' is a leap second in most time standards and thus is valid.
SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)
TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])
# datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it)
DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR}
DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR}
ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE}))
ISO8601_SECOND (?:%{SECOND}|60)
TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?
TIMESTAMP_ISO8602 %{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?
TIMESTAMP_ISO %{TIMESTAMP_ISO8601}|%{TIMESTAMP_ISO8602}
DATE %{DATE_US}|%{DATE_EU}
DATESTAMP %{DATE}[- ]%{TIME}
TZ (?:[PMCE][SD]T|UTC)
DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ}
DATESTAMP_RFC2822 %{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE}
DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR}
DATESTAMP_EVENTLOG %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND}
# Syslog Dates: Month Day HH:MM:SS
SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME}
PROG (?:[\w._/%-]+)
SYSLOGPROG %{PROG:program}(?:\[%{POSINT:pid}\])?
SYSLOGHOST %{IPORHOST}
SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}>
HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}
# Shortcuts
QS %{QUOTEDSTRING}
# Log formats
SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)
COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}
# Log Levels
LOGLEVEL ([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)
input {stdin{}}
filter {
grok {
patterns_dir => ["/etc/logstash/conf.d/patterns"]
match => { "message" => "%{TIMESTAMP_ISO:curtime} %{LOGLEVEL:level} %{JAVASOURCE:javasource} %{GREEDYDATA:logmessage}" }
}
}
output {stdout{codec => rubydebug}}
---
15:32:56.994 INFO com.fasf.aspect.ControllerAspect,com.fasf.aspect.ControllerAspect.log(ControllerAspect.java:61) ---after[1ffasf]---
----------------
input {stdin{}}
input {
# For detail config for log4j as input,
# See: https://www.elastic.co/guide/en/logstash/current/plugins-inputs-log4j.html
tcp {
mode => "server"
host => "192.168.1.55"
port => 9250
#codec => json_lines
#codec => plain { charset => "UTF-8" }
}
}
input {
beats {
host => '192.168.1.55'
port => 9251
}
}
filter {
#Only matched data are send to output.
grok {
patterns_dir => ["./patterns"]
match => { "message" => "%{DATE1:time1} %{JAVA_SOURCE:source1} %{LEVEL:level1} %{JAVALOGMESSAGE:doc}" }
}
mutate {
rename => { "[host][name]" => "host" }
}
#删除无用字段
mutate {
remove_field => "message"
remove_field => "mydate"
remove_field => "@version"
remove_field => "host"
remove_field => "path"
}
#将两个字段转换为整型
mutate{
convert => { "size" => "integer" }
convert => { "attachments" => "integer" }
}
#去除换行符
mutate{
gsub => [ "message", "\r", "" ]
}
#逗号分割
mutate {
split => ["message",","]
}
#分割后,字段命名与赋值
mutate{
add_field => {
"id" => "%{[message][0]}"
"mydate" => "%{[message][1]}"
"user" => "%{[message][2]}"
"pc" => "%{[message][3]}"
"to_user" => "%{[message][4]}"
"cc" => "%{[message][5]}"
"bcc" => "%{[message][6]}"
"from_user" => "%{[message][7]}"
"size" => "%{[message][8]}"
"attachments" => "%{[message][9]}"
"content" => "%{[message][10]}"
}
}
#字段里的日期识别,以及时区转换,生成date
date {
match => [ "mydate", "MM/dd/yyyy HH:mm:ss" ]
target => "date"
locale => "en"
timezone => "+00:00"
}
}
output {
stdout { codec => rubydebug }
# For detail config for elasticsearch as output,
# See: https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html
elasticsearch {
action => "index" #The operation on ES
hosts => "192.168.1.55:9200" #ElasticSearch host, can be array.
index => "consumer-%{appname}-%{+YYYY.MM.dd}" #The index to write data to.
}
}