ELK快速入门(03)Logstash高级配置

前面我们搭建了一个简单的ELK日志收集系统,可以看到其中的Logstash起的作用。Logstash 是一个实时数据收集引擎,可收集各类型数据并对其进行分析,过滤和归纳。按照自己条件分析过滤出符合数据导入到可视化界面。Logstash的功能很强大,远不止一个input和一个output那几行配置起的作用。下面介绍Logstash在经典场景中的用法。










简单模式:以logstash作为日志搜索器

架构:logstash采集、处理、转发到elasticsearch存储,在kibana进行展示

特点:这种结构因为需要在各个服务器上部署 Logstash,而它比较消耗 CPU 和内存资源,所以比较适合计算资源丰富的服务器,否则容易造成服务器性能下降,甚至可能导致无法正常工作(因为Logstash有点重)。

input和output的配置是比较灵活的,首先看最简单的一种,从控制台输入日志,然后输出到控制台:

input { stdin { } }

output{ stdout { } }

前面的内容介绍了从文件读取,输出到es当中,输出也可以到文件中:

output {

    #输出到文件

    file {

        path => "/logs/app/logstash/all.log" #指定写入文件路径

        flush_interval => 0                  # 指定刷新间隔,0代表实时写入

        codec => json

     }

}

简单模式基本上能满足百分之八九十的公司的日志业务了,下面介绍的几种模式适用于日志量十分庞大的系统。











安全模式:beats(FilebeatMetricbeatPacketbeatWinlogbeat等)作为日志搜集器

因为Logstash有点重,所以就有了beats。Beats 是一个面向轻量型采集器的平台,这些采集器可从边缘机器发送数据。我们从文件收集日志常用的是filebeat。下面是几种beats:

Packetbeat(搜集网络流量数据);

Topbeat(搜集系统、进程和文件系统级别的 CPU 和内存使用情况等数据);

Filebeat(搜集文件数据)-------最常用

Winlogbeat(搜集 Windows 事件日志数据)。

从架构图可以看到,我们的应用服务器收集日志的不再是Logstash,而且对应的beats,Beats 将搜集到的数据发送到 Logstash,经 Logstash 解析、过滤后,将其发送到 Elasticsearch 存储,并由 Kibana 呈现给用户。

这种架构解决了 Logstash 在各服务器节点上占用系统资源高的问题。相比 Logstash,Beats 所占系统的 CPU 和内存几乎可以忽略不计。另外,Beats 和 Logstash 之间支持 SSL/TLS 加密传输,客户端和服务器双向认证,保证了通信安全。因此这种架构适合对数据安全性要求较高,同时各服务器性能比较敏感的场景。 

来看一个filebeat的配置:

#=========== Filebeat prospectors ===========

filebeat.prospectors: 

- input_type: log

 paths:

    - /home/admin/helloworld/logs/*.log

#--------------------------- Logstash output --------------------------------

output.logstash:

 hosts: ["192.168.80.34:5044"]

Logstash的input配置:

input {

     beats {

        port => 5044

        codec => "json"

    }

}

Logstash的output配置:

output {

 输出到控制台

    # stdout { }

 输出到redis

    redis {

        host => "192.168.80.32"   # redis主机地址

        port => 6379              # redis端口号

        password => "123456"          # redis 密码

        #db => 8                   # redis数据库编号

        data_type => "channel"    # 使用发布/订阅模式

        key => "logstash_list_0"  # 发布通道名称

}

#输出到kafka

    kafka {

        bootstrap_servers => "192.168.80.42:9092"

        topic_id         => "test" 

       }

#输出到es

elasticsearch {

        hosts => "node18:9200"

        codec => json

        }

}

从上面可以看出,Logstash不仅可以输出到es和控制台,还可以输出到redis缓存中和kafka中,通过配置可以实现强大的数据传输功能。










消息模式:Beats 还不支持输出到消息队列新版本除外:5.0版本及以上

在消息队列前后两端只能是 Logstash 实例。logstash从各个数据源搜集数据,不经过任何处理转换仅转发出到消息队列(kafka、redis、rabbitMQ等),后logstash从消息队列取数据进行转换分析过滤,输出到elasticsearch,并在kibana进行图形化展示。

架构(Logstash进行日志解析所在服务器性能各方面必须要足够好):

模式特点这种架构适合于日志规模比较庞大的情况。但由于 Logstash 日志解析节点和 Elasticsearch 的负荷比较重,可将他们配置为集群模式,以分担负荷。引入消息队列,均衡了网络传输,从而降低了网络闭塞,尤其是丢失数据的可能性,但依然存在 Logstash 占用系统资源过多的问题

工作流程:Filebeat采集—>  logstash转发到kafka—>  logstash处理从kafka缓存的数据进行分析—>  输出到es—>  显示在kibana

从服务器接收日志的Logstash的配置:

input {

    beats {

    port => 5044

    codec => "json"

       }

    syslog{

       }

}

output {

    # 输出到控制台

    # stdout { }

    # 输出到redis

    redis {

        host => "192.168.80.32"   # redis主机地址

        port => 6379              # redis端口号

        password => "123456"          # redis 密码

       #db => 8                   # redis数据库编号

        data_type => "channel"    # 使用发布/订阅模式

        key => "logstash_list_0"  # 发布通道名称

    }

  #输出到kafka

    kafka {

        bootstrap_servers => "192.168.80.42:9092"

        topic_id          => "test" 

       }     

}

从kafka接收日志到es的Logstash的配置:

input{

    kafka {

           bootstrap_servers => "192.168.80.42:9092"

           topics          => ["test"]

           #decroate_events   => true

           group_id          => "consumer-test"(消费组)

           #decroate_events  => true

            auto_offset_reset => "earliest"(初始消费,相当于from beginning,不设置,相当于是监控启动后的kafka的消息生产)

        }

}

output {

 elasticsearch {

       hosts => "192.168.80.18:9200"   

       codec => json

       }

}











消息模式:logstash从kafka消息队列直接读取数据并处理、输出到es(因为从kafka内部直接读取,相当于是已经在缓存内部,直接logstash处理后就可以进行输出,输出到文件、es等)

工作模式:【数据已存在kafka对应主题内】单独的logstash,kafka读取,经过处理输出到es并在kibana进行展示

Logstash配置如下:

input{

    kafka {

          bootstrap_servers => "192.168.80.42:9092"

            topics          => ["test"]

           group_id       => "consumer-test"

           #decroate_events  => true

            auto_offset_reset => "earliest"

     }

} 

output {

       elasticsearch {

       hosts => "192.168.80.18:9200"

       codec => json

       }

}













filebeat新版本(5.0以上)支持直接支持输出到kafka,而无需经过logstash接收转发到kafka

Filebeat采集完毕直接入到kafka消息队列,进而logstash取出数据,进行处理分析输出到es,并在kibana进行展示。

filebeat配置: 

#======== Filebeat prospectors=================

filebeat.prospectors: 

- input_type: log

 paths:

    - /home/admin/helloworld/logs/*.log 

#-----------------------------kafka  output-----------------------------------

output.kafka:

  hosts: ["192.168.80.42:9092"]

  topic: test

  required_acks: 1

Logstash的配置:

input{

    kafka {

        bootstrap_servers => "192.168.80.42:9092"

            topics          => ["test"]

         group_id       => "consumer-test"

         #decroate_events  => true

       auto_offset_reset => "earliest"

   }

}

output {

       elasticsearch {

       hosts => "192.168.80.18:9200"

       codec => json

       }

}











SSL加密传输(增强安全性,仅配置了秘钥和证书的filebeat服务器和logstash服务器才能进行日志文件数据的传输)

Logstash的配置文件:

ssl_certificate_authorities :filebeat端传来的证书所在位置

ssl_certificate => 本端生成的证书所在的位置

ssl_key => /本端生成的密钥所在的位置

ssl_verify_mode => "force_peer"

input {

    beats {

    port => 5044

    codec => "json"

ssl => true

   ssl_certificate_authorities => ["/usr/local/logstash-5.6.10/pki/tls/certs/filebeat.crt"]

   ssl_certificate => "/usr/local/logstash-5.6.10/pki/tls/certs/logstash.crt"

   ssl_key => "/usr/local/logstash-5.6.10/pki/tls/private/logstash.key"

ssl_verify_mode => "force_peer"#(需与ssl_certificate_authorities一起使用

       }

    syslog{

       }

}

 

output {

    # 输出到控制台

    # stdout { }

    # 输出到redis

    redis {

        host => "192.168.80.32"   # redis主机地址

        port => 6379              # redis端口号

        password => "123456"          # redis 密码

       #db => 8                   # redis数据库编号

        data_type => "channel"    # 使用发布/订阅模式

        key => "logstash_list_0"  # 发布通道名称

    }

    #输出到kafka

    kafka {

        bootstrap_servers => "192.168.80.42:9092"

        topic_id          => "test" 

       }     

    #输出到es

    elasticsearch {

       hosts => "node18:9200"

       codec => json

       }

}

filebeat的配置文件: 

#=================== Filebeat prospectors ========================

filebeat.prospectors: 

- input_type: log

 paths:

    - /home/admin/helloworld/logs/*.log

#----------------------------- Logstash output --------------------------------

output.logstash:

# The Logstash hosts

 hosts: ["192.168.80.18:5044"]

#加密传输

 ssl.certificate_authorities: ["/usr/local/filebeat-5.6.10/pki/tls/certs/logstash.crt"]

  ssl.certificate: "/usr/local/filebeat-5.6.10/pki/tls/certs/filebeat.crt"

  ssl.key: "/usr/local/filebeat-5.6.10/pki/tls/private/filebeat.key" 













logstash(非filebeat)进行文件采集,输出到kafka缓存,读取kafka数据并处理输出到文件或es

从文件采集到kafka:

input {

 file {

        path => [

            # 这里填写需要监控的文件

            "/home/admin/helloworld/logs/catalina.out"

        ]

    }

}

output {

 kafka {

    # 输出到控制台

    # stdout { }

 输出到kafka

  bootstrap_servers => "192.168.80.42:9092"

    topic_id          => "test"

    }

}

从kafka或者redis读取数据输出到es或者文件:

input{

#从redis读取

 redis {

        host => "192.168.80.32"   # redis主机地址

        port => 6379              # redis端口号

       password  => "123456"      # redis 密码

        #db => 8                   # redis数据库编号

        data_type => "channel"    # 使用发布/订阅模式

        key => "logstash_list_0"  # 发布通道名称

}

#从kafka读取

kafka {

        bootstrap_servers => "192.168.80.42:9092"

           topics          => ["test"]

        auto_offset_reset => "earliest"

       }

}

 

output {

  #输出到文件

    file {

        path => "/usr/local/logstash-5.6.10/data/log/logstash/all1.log" # 指定写入文件路径

#       message_format => "%{host} %{message}"         # 指定写入格式

        flush_interval => 0                             # 指定刷新间隔,0代表实时写入

     codec => json

       }

 #输出到es

   elasticsearch {

       hosts => "node18:9200"

       codec => json

       }

}











logstash同步mysql数据库数据到es(logstash5版本以上已集成jdbc插件,无需下载安装,直接使用)

从mysql读取数据到es:

input {

 stdin { }

    jdbc {

  jdbc_connection_string => "jdbc:mysql://192.168.80.18:3306/fyyq-mysql"

        jdbc_user => "fyyq"

        jdbc_password => "fyyq@2017"

jdbc_driver_library => "/usr/local/logstash-5.6.10/mysql-connector-java-5.1.46.jar"

        jdbc_driver_class => "com.mysql.jdbc.Driver"

        jdbc_paging_enabled => "true"

        statement_filepath => "/usr/local/logstash-5.6.10/mysql2es.sql"

        #schedule => "* * * * *"

    }

 }

 

 output {

     stdout {

        codec => json_lines

    }

  elasticsearch {

        hosts => "node18:9200"

        #index => "mainIndex"

        #document_type => "user"

        #document_id => "%{id}"

    }

}










Logstash-input插件及插件参数概览

所有输入插件都支持以下配置选项:

codec:可选

json (json格式编解码器)

msgpack (msgpack格式编解码器)

plain(文本格式编解码器)

multiline(将多行文本event合并成一个event,eg:将java中的异常跟踪日志合并成一条消)]

常用输入插件:

1、beat-input:Receives events from the Elastic Beats framework,从框架接收事件    Settings:

2、file-input:来自文件的Streams事件(path字段必填项)

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html


3、stdin-input:从标准输入读取事件

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-stdin.html


4、syslog-input:将syslog消息作为事件读取

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-syslog.html


5、tcp-input:从TCP读取事件(port字段必填项)

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-tcp.html


6、udp-input:通过UDP读取事件(port字段必填项)

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-udp.html


7、twitter-input:从Twitter Streaming API读取事件(相对常用场景)

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-twitter.html

consumer_keyconsumer_secretoauth_tokenoauth_token_secret必填项)


8、redis-input:从Redis实例读取事件

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-redis.html

data_type["list", "channel", "pattern_channel"]、key必填项,)


9、kafka-input:从Kafka主题中读取事件

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html

(参数过多,自行查看)


10、jdbc-input:从JDBC数据创建事件

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html

jdbc_connection_stringjdbc_driver_classjdbc_user必填项)


11、http-input:通过HTTP或HTTPS接收事件

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-http.html


12、elasticsearch-input:从Elasticsearch集群读取查询结果

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-elasticsearch.html


13、exec-input:将shell命令的输出捕获为事件(command字段必填项)

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-exec.html


非 常用输入插件:

自行进入logstash的插件中心进行查看,有需要自行配置

总:https://www.elastic.co/guide/en/logstash/current/input-plugins.html











Logstash-filter插件及插件参数概览

所有处理插件均支持的配置:

常用处理插件:

1、 grok-filter:可以将非结构化日志数据解析为结构化和可查询的内容

https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html#_grok_basics

grok模式的语法是%{SYNTAX:SEMANTIC}

SYNTAX是与您的文本匹配的模式的名称

SEMANTIC是您为匹配的文本提供的标识符

grok是通过系统预定义的正则表达式或者通过自己定义正则表达式来匹配日志中的各个值

正则解析式比较容易出错,建议先调试(地址):

grok debugger调试:http://grokdebug.herokuapp.com/


grok事先已经预定义好了许多正则表达式规则,该规则文件存放路径:

/usr/local/logstash-5.6.10/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-4.1.2/patterns

等等,可自行进入查看。

示例一:

初始输入的message是:

经过grok的正则分析后:

示例二:

COMBINEDAPACHELOG的具体内容见:

https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/httpd

初始输入message为:

经过grok正则分析后:

示例三(自定义grok表达式mypattern[A-Z]):

初始输入message:

经过grok正则分析后:

示例四(移除重复字段):

初始输入message:

经过grok正则解析后(json格式):

示例五(过滤筛选catalina.out文件中的信息,message字段已移除):

【Data在pattern中的定义是:.*? GREEDYDATA在pattern中的定义是:.*】


初始输入message:

经过grok正则解析后(截图及json格式如下):

常用参数:

1)match:match作用:用来对字段的模式进行匹配

2)patterns_dir:用来指定规则的匹配路径,如果使用logstash自定义的规则时,不需要写此参数。Patterns_dir可以同时制定多个存放过滤规则的目录;

3)remove_field:如果匹配到某个”日志字段,则将匹配的这个日志字段从这条日志中删除(多个以逗号隔开)

2、 clone-filter:克隆过滤器用于复制事件

3、  drop-filter:丢弃所有活动

4、  json-filter:解析JSON事件

5、  kv-filter:解析键值对


非常用参数:参考教程:https://www.elastic.co/guide/en/logstash/current/filter-plugins.html










Logstash-output插件及插件参数概览

所有输出插件均支持以下配置:

常用插件:

1、Elasticsearch-output:此插件是在Elasticsearch中存储日志的推荐方法。如果您打算使用Kibana Web界面,则需要使用此输出


2、file-output:此输出将事件写入磁盘上的文件(path字段必填项)


3、kafka-output:将事件写入Kafka主题(topic_id是必填项)


4、 redis-output:此输出将使用RPUSH将事件发送到Redis队列


5、stdout-output:一个简单的输出,打印到运行Logstash的shell的STDOUT


非常用插件:参考官网教程链接:https://www.elastic.co/guide/en/logstash/current/output-plugins.html





文章原文链接:https://www.cnblogs.com/qingqing74647464/p/9378385.html



















我们的交流基地,“JAVA互联网技术交流:789650498”欢迎小伙伴们一起来交流:

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 222,104评论 6 515
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,816评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,697评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,836评论 1 298
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,851评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,441评论 1 310
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,992评论 3 421
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,899评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,457评论 1 318
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,529评论 3 341
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,664评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,346评论 5 350
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,025评论 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,511评论 0 24
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,611评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 49,081评论 3 377
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,675评论 2 359

推荐阅读更多精彩内容