logstash语法

og4j2与logstash配置

logstash grok 自测实例+常用正则
https://blog.csdn.net/qq_34646817/article/details/81232121#t1
grokdebug
Grok Debugger本地安装配置
https://blog.csdn.net/u013274150/article/details/82415818

正则表达式
正则表达式手册

https://www.cnblogs.com/kevin-yuan/archive/2012/11/11/2765340.html

提取字符串 da12bka3434bdca4343bdca234bm中包含在字符a和b之间的数字,但是这个a之前的字符不能是c;b后面的字符必须是d才能提取。

显然,这里就只有3434这个数字满足要求。那么我们怎么提取呢?

首先,我们写出含有捕获组的正则表达式:[^c]a\d*bd

然后我们再将其变为非捕获组的正则表达式:(?<=[^c]a)\d*(?=bd)

基本语法

=~ 匹配正则
!~ 不匹配正则
in ,not in
与and 或 or   
非与nand    非或xor

#字段引用 
${[name][status]}

插件 plugin

yum install -y gem
gem sources --add https://gems.ruby-china.com/ --remove https://rubygems.org/
gem sources -l


bin/plugin install|update|uninstall logstash-input-jdbc

#(其实就在 vendor/bundle/jruby/1.9/gems/ 目录下)
bin/plugin list


#本地插件安装  执行成功以后 Gemfile 文件最后会多出一段内容
bin/logstash-plugin install /path/to/logstash-filter-crash.gem

bin/plugin install logstash-input-log4j2


插件下载

RubyGems 镜像
https://gems.ruby-china.com/

测试

#开启测试模式运行
/usr/share/logstash/bin/logstash --path.settings /etc/logstash/ -f /etc/logstash/conf.d/logstash-simple.conf   --debug | tee dubug.log

/usr/share/logstash/bin/logstash  -f /etc/logstash/conf.d/logback-es.conf


less file 
nl   file | grep key

# 格式化文本
echo -ne '888888888888'

cat randdata | awk '{print $2}' | sort | uniq -c | tee sortdata

top -h|H

#替换文字
sed 's/upstreamtime":-/upstreamtime":0/'

Rsyslog


#导入数据到logstash
nc 127.0.0.1 8888 < olddata

配置

通常你要导入原有数据进 Elasticsearch 的话,你还需要 filter/date 插件来修改默认的"@timestamp" 字段值
https://elkguide.elasticsearch.cn/logstash/plugins/filter/dissect.html

bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
---
input {
    stdin { }
}
output {
    stdout { }
}

input {stdin{}}
output {stdout{codec => rubydebug}}


---
input {
    stdin {
        add_field => {"key" => "value"}
        codec => "plain"
        tags => ["add"]
        type => "std"
    }
}
---
input {
    file {
        path => ["/var/log/**/*.log", "/var/log/message"] #绝对路径
        type => "system"
        start_position => "beginning"
        sincedb_path => /dev/null  #每次重启自动从头开始读
    }
}
---
input {
    stdin {
        codec => multiline {
            pattern => "^\["
            negate => true
            what => "previous"
        }
    }
}
---
input {
    tcp {
        port => 8888
        mode => "server"
        ssl_enable => false
    }
}
---
#nginx.conf
logformat json '{"@timestamp":"$time_iso8601",'
               '"@version":"1",'
               '"host":"$server_addr",'
               '"client":"$remote_addr",'
               '"size":$body_bytes_sent,'
               '"responsetime":$request_time,'
               '"domain":"$host",'
               '"url":"$uri",'
               '"status":"$status"}';
access_log /var/log/nginx/access.log_json json;
# logstash.conf
input {
    file {
        path => "/var/log/nginx/access.log_json"
        codec => "json"
    }
}
---
filter {
    grok {
        match => ["message", "%{HTTPDATE:logdate}"]
    }
    date {
        match => ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"]
    }
}
---
input {stdin{}}
filter {
    grok {
        match => {
              # grok 表达式统一写入到一个地方 用 filter/grok 的 patterns_dir 选项来指明
              patterns_dir => ["/path/to/your/own/patterns"]
            "message" => "\s+(?<request_time>\d+(?:\.\d+)?)\s+"  #(?P<name>pattern)
            "message" => "(?m)\s+(?<request_time>\d+(?:\.\d+)?)\s+"  # (?m) 标记匹配回车换行
            "message" => "%{SYSLOGBASE} %{DATA:message}"
            #[正则表达式(?pgroup)](https://blog.csdn.net/lc574260570/article/details/82701295)
          #/path/to/your/own/patterns  eg:/etc/logstash/patterns.d/
          #   # common postfix patterns
              #POSTFIX_QUEUEID ([0-9A-F]{6,}|[0-9a-zA-Z]{15,}|NOQUEUE)
              # helper patterns
              #GREEDYDATA_NO_COLON [^:]*
            (?<date>\d{2}/\d{2}/\d{2})\s(?<time>\d{2}):\d{2}:\d{2}),\d{3})\s(?<message>.+)
        }
match => [
    "message", "(?<request_time>\d+(?:\.\d+)?)",
    "message", "%{SYSLOGBASE} %{DATA:message}",
    "message", "(?m)%{WORD}"
]
        remove_field => ["message"] #删除掉 message 字段
        overwrite => ["message"]  #重写默认的 message 字段
    }
}
output {stdout{codec => rubydebug}}
{
         "message" => "begin 123.456 end",
        "@version" => "1",
      "@timestamp" => "2014-08-09T11:55:38.186Z",
            "host" => "raochenlindeMacBook-Air.local",
    "request_time" => "123.456"
}
---
filter {
    grok {
        match => {
            #%{PATTERN_NAME:capture_name:data_type}    data_type 目前只支持两个值:int 和 float
            "message" => "%{WORD} %{NUMBER:request_time:float} %{WORD}"
        }
    }
}
---
filter {
    dissect {
        mapping => {
            "message" => "%{ts} %{+ts} %{+ts} %{src} %{} %{prog}[%{pid}]: %{msg}"
                              #http://%{domain}/%{?url}?%{?arg1}=%{&arg1}
        }
        convert_datatype => {
            pid => "int"
        }
    }
}
#http://rizhiyi.com/index.do?id=123   http://%{domain}/%{?url}?%{?arg1}=%{&arg1}
---
output {
    elasticsearch {
        hosts => ["192.168.0.2:9200"]
        index => "logstash-%{type}-%{+YYYY.MM.dd}" #索引名中不能有大写字母。    以 + 号开头的,就会自动认为后面是时间格式,尝试用时间格式来解析后续字符串       
        document_type => "%{type}"
        flush_size => 20000  #攒到 20000 条数据一次性发送出去
        idle_flush_time => 10 #如果 10 秒钟内也没攒够 20000 条,Logstash 还是会以当前攒到的数据量发一次
        sniffing => true
        template_overwrite => true
    }
}



------Log4J
<appender name="LOGSTASH" class="org.apache.log4j.net.SocketAppender">
    <param name="RemoteHost" value="logstash_hostname" />
    <param name="ReconnectionDelay" value="60000" />
    <param name="LocationInfo" value="true" />
    <param name="Threshold" value="DEBUG" />
</appender>
<root>
    <level value="INFO"/>
    <appender-ref ref="OTHERPLACE"/>
    <appender-ref ref="LOGSTASH"/>
</root>
---
log4j.rootLogger=DEBUG, logstash

###SocketAppender###
log4j.appender.logstash=org.apache.log4j.net.SocketAppender
log4j.appender.logstash.Port=4560
log4j.appender.logstash.RemoteHost=logstash_hostname
log4j.appender.logstash.ReconnectionDelay=60000
log4j.appender.logstash.LocationInfo=true
---
log4j.rootLogger=INFO,console

# for package com.demo.elk, log would be sent to socket appender.
log4j.logger.com.demo.elk=DEBUG, socket

# appender socket
log4j.appender.socket=org.apache.log4j.net.SocketAppender
log4j.appender.socket.Port=4567
log4j.appender.socket.RemoteHost=centos2
log4j.appender.socket.layout=org.apache.log4j.PatternLayout
log4j.appender.socket.layout.ConversionPattern=%d [%-5p] [%l] %m%n
log4j.appender.socket.ReconnectionDelay=10000

# appender console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d [%-5p] [%l] %m%n
----
input {
  log4j {
    type => "log4j-json"
    port => 4560
  }
}

------------------
[https://www.cnblogs.com/linjiqin/p/10757261.html](https://www.cnblogs.com/linjiqin/p/10757261.html)
[https://segmentfault.com/a/1190000016192394?utm_source=tag-newest](https://segmentfault.com/a/1190000016192394?utm_source=tag-newest)
[https://blog.csdn.net/BianChengNinHao/article/details/83503295](https://blog.csdn.net/BianChengNinHao/article/details/83503295)
[https://www.cnblogs.com/xing901022/p/4830684.html](https://www.cnblogs.com/xing901022/p/4830684.html)
[高版本logstash收集log4j日志](https://blog.csdn.net/haozhuxuan/article/details/79738447)
[Slf4j与log4j及log4j2的关系及使用方法](https://blog.csdn.net/Andrew_Yuan/article/details/83010938)
[springmvc使用log4j2的配置](https://blog.csdn.net/smallbabylong/article/details/83475308)

[log4j:configuration](https://blog.csdn.net/jeikerxiao/article/details/78092265)
 [Log4J日志整合及配置详解](https://www.cnblogs.com/wangzhuxing/p/7753420.html)

[log4j正则匹配 logstash 的 grok-patterns ](https://blog.csdn.net/qq_28364999/article/details/82945024)
[https://www.cnblogs.com/Orgliny/p/5592186.html](https://www.cnblogs.com/Orgliny/p/5592186.html)

[filebeat + logstash 对message提取指定字段](https://blog.csdn.net/weixin_33901926/article/details/87495298)





[设置日志输出编码utf8](https://jiangzhengjun.iteye.com/blog/526364)


[XML中必须进行转义的字符](https://blog.csdn.net/chenlycly/article/details/51314686)


[log4j添加自定义Layout类转成想要的json格式](https://blog.csdn.net/lnkToKing/article/details/79563460)


#log4j2.xml
   # [log4j的org.apache.log4j.PatternLayout](https://www.cnblogs.com/luoxuan3/p/4200711.html)

{"time":"%d{yyyy-MM-dd HH:mm:ss,SSS}","logtype":"%p","loginfo":"%c:%m"}%n
    <Properties>
        <property name="LOG_PATTERN">%d{yyyy-MM-dd HH:mm:ss.SSS} -%5p ${PID:-} [%15.15t] %-30.30C{1.} : %m%n</property>
    </Properties>
      <Properties>
        <Property name="LOG_PATTERN">{"logger": "%logger", "level": "%level", "msg": "%message"}%n</Property>
    </Properties>
     <!-- 47.*.*.159为logstash主机外网IP,4560为logstash端口 -->
        <Socket name="logstash-tcp" host="47.*.*.159" port="4560" protocol="TCP">
            <JsonLayout compact="true" eventEol="true" />
            <PatternLayout pattern="${LOG_PATTERN}" />
            <PatternLayout charset="UTF-8" pattern="${log_pattern}"/>
        </Socket>
# logstash.conf
input {
    tcp {  
        port => 4560 
        codec => json
    }  
}
---
config/log4j2-tcp.conf
input {
    tcp {
        mode => "server"
        host => "127.0.0.1"
        port => 4567
    }
}
filter {
    json {
        source => "message"
    }
}
output {
    stdout {
        codec => rubydebug
    }
} 


------------
<Configuration>
  <Appenders>
     <Socket name="Socket" host="localhost" port="12345">
       <JsonLayout compact="true" eventEol="true" />
    </Socket>
  </Appenders>
  <Loggers>
    <Root level="info">
      <AppenderRef ref="Socket"/>
    </Root>
  </Loggers>
</Configuration>
---

input {
    tcp {  
         
        host => "192.168.0.153"
        port => 4567  
        #codec => plain { charset => "GB2312" }  
        codec => json
    }  
} 
 
filter {
    json {
        source => "message"
        add_field => ["type", "%{dtype}"]
        remove_field => [ "server", "server.fqdn", "timestamp" ]
    }
}
 
output {
 
    if "_jsonparsefailure" not in [tags] {
        stdout { codec => rubydebug }
        elasticsearch {
            hosts => "192.168.2.181:9200"
        }
    }
}

可用的配置

#logstash.conf
# For detail structure of this file
# Set: https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html
input {stdin{}}
input {
  # For detail config for log4j as input,
  # See: https://www.elastic.co/guide/en/logstash/current/plugins-inputs-log4j.html
  tcp {
    mode => "server"
    host => "192.168.1.55"
    port => 9250
   codec => plain { charset => "UTF-8" }
    codec => json_lines
  }
}
filter {
  #Only matched data are send to output.   fasfsf 12.23 sdfsa
grok {
        match => {
            "message" => "(?<msg>.+)\s+(?<request_time>\d+(?:\.\d+)?)\s+(?<message>.+)"
        }
    }

}
output {

  stdout { codec => rubydebug }


  # For detail config for elasticsearch as output,
  # See: https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html
  elasticsearch {
    action => "index"          #The operation on ES
    hosts  => "192.168.1.55:9200"   #ElasticSearch host, can be array.
    index  => "consumer-%{appname}-%{+YYYY.MM.dd}"         #The index to write data to.
  }
}
------
#logback.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <include resource="org/springframework/boot/logging/logback/defaults.xml"/>

    <springProperty scope="context" name="springAppName" source="spring.application.name"/>
    <!-- 日志在工程中的输出位置 -->
    <property name="LOG_FILE" value="${BUILD_FOLDER:-build}/${springAppName}"/>
    <!-- 控制台的日志输出样式 -->
    <property name="CONSOLE_LOG_PATTERN"
              value="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr([${springAppName:-},%X{X-B3-TraceId:-},%X{X-B3-SpanId:-},%X{X-Span-Export:-}]){yellow} %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>

    <!-- 控制台Appender -->
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <level>INFO</level>
        </filter>
        <encoder>
            <pattern>${CONSOLE_LOG_PATTERN}</pattern>
            <charset>utf8</charset>
        </encoder>
          <!--<encoder>
              <pattern>%d{yyyy-MM-dd HH:mm:ss} %contextName %-5level %logger{50} -%msg%n</pattern>
          </encoder>-->
    </appender>

    <!--<appender name="logstash" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
        <destination>127.0.0.1:9250</destination>
    </appender>-->

    <!-- 为logstash输出的json格式的Appender -->
    <appender name="logstash_file" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOG_FILE}.json</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${LOG_FILE}.json.%d{yyyy-MM-dd}.gz</fileNamePattern>
            <maxHistory>7</maxHistory>
        </rollingPolicy>
        <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
            <providers>
                <timestamp>
                    <timeZone>UTC</timeZone>
                </timestamp>
                <pattern>
                    <pattern>
                        {
                        "severity": "%level",
                        "trace": "%X{X-B3-TraceId:-}",
                        "pid": "${PID:-}",
                        "thread": "%thread",
                        "class": "%logger",
                        "rest": "%message"
                        }
                    </pattern>
                </pattern>
                <logstashMarkers/>
                <stackTrace>
                    <throwableConverter class="net.logstash.logback.stacktrace.ShortenedThrowableConverter">
                        <maxDepthPerThrowable>30</maxDepthPerThrowable>
                        <maxLength>4096</maxLength>
                        <shortenedClassNameLength>20</shortenedClassNameLength>
                        <rootCauseFirst>true</rootCauseFirst>
                    </throwableConverter>
                </stackTrace>
            </providers>
        </encoder>
    </appender>

    <appender name="logstash"
              class="net.logstash.logback.appender.LogstashTcpSocketAppender">
        <destination>192.168.1.55:9250</destination>
        <!-- encoder必须配置,有多种可选 -->
        <encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder" >
            <!-- "appname":"yang_test" 的作用是指定创建索引的名字时用,并且在生成的文档中会多了这个字段  -->
            <customFields>{"appname":"springcloud_consume"}</customFields>
        </encoder>
        <!-- 日志输出编码 -->
        <!--<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
            <providers>
                <timestamp>
                    <timeZone>UTC</timeZone>
                </timestamp>
                <pattern>
                    <pattern>
                        {
                        "severity": "%level",
                        "service": "${springAppName:-}",
                        "trace": "%X{X-B3-TraceId:-}",
                        "span": "%X{X-B3-SpanId:-}",
                        "exportable": "%X{X-Span-Export:-}",
                        "pid": "${PID:-}",
                        "thread": "%thread",
                        "class": "%logger{40}",
                        "rest": "%message",
                        "appname":"springcloud_consume"
                        }
                    </pattern>
                </pattern>
            </providers>
        </encoder>-->
    </appender>

    <root level="INFO">
        <appender-ref ref="console"/>
        <appender-ref ref="logstash_file"/>
        <appender-ref ref="logstash"/>
    </root>
</configuration>


----


    <appender name="socketAppender" class="org.apache.log4j.net.SocketAppender">
        <param name="remoteHost" value="192.168.1.55" /><!-- 远程主机地址 -->
        <param name="port" value="9250" />
        <param name="Threshold" value="DEBUG" />
        <param name="ReconnectionDelay" value="60000" />
        <param name="LocationInfo" value="true" />
        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern" value="%d{HH:mm:ss.SSS},[%c,%l] %m%n" />
        </layout>
    </appender>

---
log4j.rootCategory=INFO,Logstash

# Logstash appender
log4j.appender.Logstash=org.apache.log4j.net.SocketAppender
log4j.appender.Logstash.RemoteHost=192.168.1.55
log4j.appender.Logstash.port=9250
log4j.appender.Logstash.Threshold=INFO
log4j.appender.Logstash.ReconnectionDelay=60000
log4j.appender.Logstash.LocationInfo=true
---

### log output control D is debug log output is or not ,E is ERROR OUTPUT is or not control by have D ,E manual control
log4j.rootLogger = error,stdout,D,E
 
### console logs ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
 
### debug log ###
log4j.appender.D = org.apache.log4j.DailyRollingFileAppender
log4j.appender.D.File = ./logs/debug/debug.log
log4j.appender.D.Append = true
log4j.appender.D.Threshold = DEBUG 
log4j.appender.D.layout = org.apache.log4j.PatternLayout
log4j.appender.D.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n
 
###error logs ###
log4j.appender.E = org.apache.log4j.DailyRollingFileAppender
log4j.appender.E.File = ./logs/error/error.log
log4j.appender.E.Append = true
log4j.appender.E.Threshold = ERROR 
log4j.appender.E.layout = org.apache.log4j.PatternLayout
log4j.appender.E.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n

filebeats

# 配置文件路径 "/etc/filebeat/filebeat.yml"
# 一个配置文件可以包含多个prospectors,一个prospectors可以包含多个path。
filebeat:
    spool_size: 1024                                    # 最大可以攒够 1024 条数据一起发送出去
    idle_timeout: "5s"                                  # 否则每 5 秒钟也得发送一次
    registry_file: ".filebeat"                          # 文件读取位置记录文件,会放在当前工作目录下。所以如果你换一个工作目录执行 filebeat 会导致重复传输!
    # Additional prospector
    registry_file: /var/lib/filebeat/registry
    config_dir: "path/to/configs/contains/many/yaml"    # 如果配置过长,可以通过目录加载方式拆分配置
  # List of prospectors to fetch data.
  prospectors:
    # Each - is a prospector. Below are the prospector specific configurations
    -
      fields: ownfield: "mac"                      # 类似 logstash 的 add_fields
      ignore_older: "24h"                         # 超过 24 小时没更新内容的文件不再监听。在 windows 上另外有一个配置叫 force_close_files,只要文件名一变化立刻关闭文件句柄,保证文件可以被删除,缺陷是可能会有日志还没读完
      scan_frequency: "10s"                       # 每 10 秒钟扫描一次目录,更新通配符匹配上的文件列表
      tail_files: false                           # 是否从文件末尾开始读取
      encoding: "utf-8"
      harvester_buffer_size: 16384                # 实际读取文件时,每次读取 16384 字节
      backoff: "1s"                               # 每 1 秒检测一次文件是否有新的一行内容需要读取
      paths:
        - /var/log/messages # 指明读取文件的位置
        - "/var/log/apache/*"  # 可以使用通配符
        - /var/log/wifi.log
      exclude_files: ["/var/log/apache/error.log"]
      input_type: log  # 除了 "log",还有 "stdin"
      document_type: messages         # 定义写入 ES 时的 _type 值
      include_lines: ["^ERR", "^WARN"]            # 只发送包含这些字样的日志
      exclude_lines: ["^OK"]                      # 不发送包含这些字样的日志

            multiline:                                  # 多行合并
                pattern: '^[[:space:]]'
                negate: false
                match: after


    -
      paths:
        - /alidata/log/nginx/access/access.log.json
      input_type: log
      document_type: nginxacclog
############################# Libbeat Config ##################################
# Base config file used by all other beats for using libbeat features

############################# Output ##########################################

# 输出数据到 redis 
output:
  redis:
    host: "10.122.52.129"
    port: 6379
    password: "123456"
# 输出数据到 logstash ,一般两者选用其一
  logstash:
    hosts: ["10.160.8.221:5044"]
############################# Shipper #########################################
shipper:
# 打上服务器tag
  name: "host_2"
############################# Logging #########################################  
logging:  
  files:
    rotateeverybytes: 10485760 # = 10MB



------------------
先由filebeat收集系统日志,收集后再发送给logstash处理,logstash可单独部署在一台服务器上用于接受处理filebeat发送过来的日志。Filebeat需要配置为将日志发送给logstash,filebeat的配置为(其他不发送的需注释掉):

]# sed -n 91,94p /etc/filebeat/filebeat.yml  
#-------------------Logstash output----------------------
output.logstash:
  # The Logstash hosts
  hosts: ["10.0.0.13:5044"]
  Logstash接受filebeat发送来的日志处理完成后再发送给elasticsearch,logstash的配置为:

#] cat /etc/logstash/conf.d/test.conf
input {
        beats {
                host => '0.0.0.0'
                port => 5044
        }
}




https://blog.51cto.com/tchuairen/1840596

DATE1    [0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}
LEVEL   (INFO)|(DEBUG)|(ERROR)|(WARN)|(FATAL)
JAVA_SOURCE [a-zA-Z.<>():0-9]*
JAVASOURCE (?:[a-zA-Z.,<>():0-9]*)



USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME}
INT (?:[+-]?(?:[0-9]+))
BASE10NUM (?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+)))
NUMBER (?:%{BASE10NUM})
BASE16NUM (?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+))
BASE16FLOAT \b(?<![0-9A-Fa-f.])(?:[+-]?(?:0x)?(?:(?:[0-9A-Fa-f]+(?:\.[0-9A-Fa-f]*)?)|(?:\.[0-9A-Fa-f]+)))\b

POSINT \b(?:[1-9][0-9]*)\b
NONNEGINT \b(?:[0-9]+)\b
WORD \b\w+\b
NOTSPACE \S+
SPACE \s*
DATA .*?
GREEDYDATA .*
QUOTEDSTRING (?>(?<!\\)(?>"(?>\\.|[^\\"]+)+"|""|(?>'(?>\\.|[^\\']+)+')|''|(?>`(?>\\.|[^\\`]+)+`)|``))
UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12}

# Networking
MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC})
CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4})
WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})
COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2})
IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?
IPV4 (?<[图片上传失败...(image-8f9d4d-1559555530871)][.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2}))(?![0-9])
IP (?:%{IPV6}|%{IPV4})
HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b)
HOST %{HOSTNAME}
IPORHOST (?:%{HOSTNAME}|%{IP})
HOSTPORT %{IPORHOST}:%{POSINT}

# paths
PATH (?:%{UNIXPATH}|%{WINPATH})
UNIXPATH (?>/(?>[\w_%!$@:.,-]+|\\.)*)+
TTY (?:/dev/(pts|tty([pq])?)(\w+)?/?(?:[0-9]+))
WINPATH (?>[A-Za-z]+:|\\)(?:\\[^\\?*]*)+
URIPROTO [A-Za-z]+(\+[A-Za-z+]+)?
URIHOST %{IPORHOST}(?::%{POSINT:port})?
# uripath comes loosely from RFC1738, but mostly from what Firefox
# doesn't turn into %XX
URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%_\-]*)+
#URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)?
URIPARAM \?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]]*
URIPATHPARAM %{URIPATH}(?:%{URIPARAM})?
URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?

# Months: January, Feb, 3, 03, 12, December
MONTH \b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\b
MONTHNUM (?:0?[1-9]|1[0-2])
MONTHNUM2 (?:0[1-9]|1[0-2])
MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])

# Days: Monday, Tue, Thu, etc...
DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)

# Years?
YEAR (?>\d\d){1,2}
HOUR (?:2[0123]|[01]?[0-9])
MINUTE (?:[0-5][0-9])
# '60' is a leap second in most time standards and thus is valid.
SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)
TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])
# datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it)
DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR}
DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR}
ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE}))
ISO8601_SECOND (?:%{SECOND}|60)
TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?

TIMESTAMP_ISO8602 %{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?
TIMESTAMP_ISO %{TIMESTAMP_ISO8601}|%{TIMESTAMP_ISO8602}

DATE %{DATE_US}|%{DATE_EU}
DATESTAMP %{DATE}[- ]%{TIME}
TZ (?:[PMCE][SD]T|UTC)
DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ}
DATESTAMP_RFC2822 %{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE}
DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR}
DATESTAMP_EVENTLOG %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND}

# Syslog Dates: Month Day HH:MM:SS
SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME}
PROG (?:[\w._/%-]+)
SYSLOGPROG %{PROG:program}(?:\[%{POSINT:pid}\])?
SYSLOGHOST %{IPORHOST}
SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}>
HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}

# Shortcuts
QS %{QUOTEDSTRING}

# Log formats
SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)
COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}

# Log Levels
LOGLEVEL ([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)
input {stdin{}}
filter {
    grok {
          patterns_dir => ["/etc/logstash/conf.d/patterns"]
          match => { "message" => "%{TIMESTAMP_ISO:curtime} %{LOGLEVEL:level} %{JAVASOURCE:javasource} %{GREEDYDATA:logmessage}" }
        }
}
output {stdout{codec => rubydebug}}
---

15:32:56.994 INFO com.fasf.aspect.ControllerAspect,com.fasf.aspect.ControllerAspect.log(ControllerAspect.java:61) ---after[1ffasf]---



----------------


input {stdin{}}
input {
  # For detail config for log4j as input,
  # See: https://www.elastic.co/guide/en/logstash/current/plugins-inputs-log4j.html
  tcp {
    mode => "server"
    host => "192.168.1.55"
    port => 9250
    #codec => json_lines
    #codec => plain { charset => "UTF-8" }
  }
}
input {
 beats {
   host => '192.168.1.55'
   port => 9251
  }
}
filter {
  #Only matched data are send to output.
    grok {
        patterns_dir => ["./patterns"]
        match => { "message" => "%{DATE1:time1} %{JAVA_SOURCE:source1} %{LEVEL:level1} %{JAVALOGMESSAGE:doc}" }
    }

  mutate {
    rename => { "[host][name]" => "host" }
  }

#删除无用字段
mutate {  
  remove_field => "message"    
  remove_field => "mydate"    
  remove_field => "@version"    
  remove_field => "host"    
  remove_field => "path"    
}
#将两个字段转换为整型
mutate{
convert => { "size" => "integer" }
convert => { "attachments" => "integer" }
}
#去除换行符
mutate{
gsub => [ "message", "\r", "" ]   
}
 
#逗号分割
mutate {  
  split => ["message",","]     
}
 
#分割后,字段命名与赋值
mutate{
                add_field =>   {
                                        "id" => "%{[message][0]}"
                                        "mydate" => "%{[message][1]}"
                                        "user" => "%{[message][2]}"
                                        "pc" => "%{[message][3]}"
                                        "to_user" => "%{[message][4]}"
"cc" => "%{[message][5]}"
"bcc" => "%{[message][6]}"
"from_user" => "%{[message][7]}"
                                        "size" => "%{[message][8]}"
"attachments" => "%{[message][9]}"
"content" => "%{[message][10]}"
                     } 
               }
 
#字段里的日期识别,以及时区转换,生成date
      date {
            match => [ "mydate", "MM/dd/yyyy HH:mm:ss" ]
                       target => "date"
  locale => "en"
  timezone => "+00:00"  
      }


}
output {

  stdout { codec => rubydebug }


  # For detail config for elasticsearch as output,
  # See: https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html
  elasticsearch {
    action => "index"          #The operation on ES
    hosts  => "192.168.1.55:9200"   #ElasticSearch host, can be array.
    index  => "consumer-%{appname}-%{+YYYY.MM.dd}"         #The index to write data to.
  }
}




最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 222,183评论 6 516
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,850评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,766评论 0 361
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,854评论 1 299
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,871评论 6 398
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,457评论 1 311
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,999评论 3 422
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,914评论 0 277
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,465评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,543评论 3 342
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,675评论 1 353
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,354评论 5 351
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,029评论 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,514评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,616评论 1 274
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 49,091评论 3 378
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,685评论 2 360

推荐阅读更多精彩内容