一般logstash更常用于ELK日志监控系统,作为数据管道
Logstash:接收,处理,转发日志
Elasticsearch:文档型数据库,实时的分布式搜索和分析引擎
Kibana:查询、生成报表的web GUI
Logstash 有两个必要元素:input 和 output ,一个可选元素:filter。
input:采集数据,指定数据来源。可以是文件路径,也可以是kafka的某个topic、redis。
filter:筛选、处理数据,剔除多余的数据,生成想要的索引。
output:将数据传输到目的地,可以是具体文件路径,也可以是kafka、redis。
配置
在安装目录bin文件夹里新建一个logstash.conf,用来配置上述所说的输出输出等,将会覆盖logstash.yml默认设置。
一般配置如下:
# 输入输入
input { stdin {} }
# 数据处理
filter {
grok {
match => ["message", "%{COMBINEDAPACHELOG}"]
}
}
# 数据输出
output { stdout { codec => rubydebug } }
配置文件语法类似Ruby
filter
filter中的配置,基本格式如下,plugin都是官方提供的,针对文本做处理,比如正则表达式啊(grok),按固定的格式做切分(kv)等等。选择正确的plugin可以更快速的帮助你解析日志
filter {
plugin {
XX => "YY"
}
if expression {
plugin {
XX => "YY"
}
} else if expression {
plugin {
XX => "YY"
}
} else {
plugin {
XX => "YY"
}
}
plugin {
XX => "YY"
}
}
关于插件
1、grok:正则表达式插件,功能强大,有许多内置的pattern
以何种规则从字符串中提取出结构化的信息,grok是logstash里的一款插件,可以使用正则表达式匹配日志,上文中的%{COMBINEDAPACHELOG}是内置的正则,用来匹配apache access日志
默认正则:
# Log formats
SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)
COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}
# Log Levels
LOGLEVEL ([A-a]lert|ALERT|[T|t]race|TRACE|[D|d]ebug|DEBUG|[N|n]otice|NOTICE|[I|i]nfo|INFO|[W|w]arn?(?:ing)?|WARN?(?:ING)?|[E|e]rr?(?:or)?|ERR?(?:OR)?|[C|c]rit?(?:ical)?|CRIT?(?:ICAL)?|[F|f]atal|FATAL|[S|s]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)
2、mutate:字段的CRUD操作,比如替换内容,截取等等
mutate {
split=>{"error_request_info"=>",fromuid"}
add_field=>{"error_uri"=>"%{error_request_info[0]}"}
remove_field=>["error_request_info"]
}
3、kv:key-value插件,非常适合URL参数解析一类的具有固定分隔符的日志
#比如解析URL的querystring: a=1&b=2&c=3
filter {
kv {
field_split => "&"
}
}
4、json:将json字符串直接转换成对应的key-value
#比如日志为:xxxxxxxxx:xxxxxx&result={"data":"1"}:xxxxxx
filter {
json {
#假设数据通过grok预处理,将result内容捕获
source => "result"
}
}
#通过json encode后
{
"data": "1"
}
5、drop:直接丢掉本行日志,过滤不符合要求的日志
if "/app/log/logfront" in [content] {
# 特定的处理
drop {}
}
output
配置保存解析结果
- elasticsearch:将事件数据发送给 Elasticsearch(推荐模式)。
- file:将事件数据写入文件或磁盘。
- statsd:将事件数据发送到 statsd (这是一种侦听统计数据的服务,如计数器和定时器,通过UDP发送并将聚合发送到一个或多个可插入的后端服务)