Logstash采集日志信息输出至ElasticSearch集群

  程序运行时日志内容繁多,格式复杂,当系统运行时出现不符合预期的或错误的结果时,怎样才能从运行时日志信息中快速提取系统中的重点数据信息,并对出现的错误进行归类统计,实时了解系统的运行状态?

  通过Logstash监控系统运行时日志文件,对所需关注的重点信息字段进行过滤提取,将日志文件中过滤提取的字段数据输出到ElasticSearch搜索引擎进行结构化存储,并通过与Spring data elasticsearch和Spring boot框架的集成,实现通过Java程序在Java Web项目界面统计查询ElasticSearch中保存的数据信息。

实现过程:

一、部署ElasticSearch集群

  ElasticSearch版本更新较快,如果需要与Spring boot + Spring data ElasticSearch进行集成,需注意Spring boot 和 Spring data ElasticSearch框架是否对所选版本的ElasticSearch提供了很好的支持。

  目前Spring boot + Spring Data ElasticSearch + ElasticSearch的版本对应关系


Spring Boot Version (x)    Spring Data Elasticsearch Version (y)    Elasticsearch Version (z)

x <= 1.3.5                  y <= 1.3.4                              z <= 1.7.2

x >= 1.4.x                  2.0.0 <=y <5.0.0                        2.0.0 <= z < 5.0.0

  部署ElasticSearch可直接在ELK官网下载对应版本的源码,并解压到对应目录下。

1、官网下载ElasticSearch、Logstash源代码,网站链接:
https://www.elastic.co/cn/downloads/elasticsearch

  按照版本间依赖关系注意版本的选择。测试部署环境为Ubuntu 16.04LTS,此处选择ElasticSearch 1.7.6 TAR版本。

下载ElasticSearch

2、将解压后的ElasticSearch源代码文件上传拷贝至/opt目录下。

解压上传ElasticSearch源码

3、配置ElasticSearch插件。
1)执行

vi /opt/ElasticSearch-1.7.6/config/elasticsearch.yml

编辑配置文件。配置文件详解如下:


cluster.name: elasticsearch

配置es的集群名称,默认是elasticsearch,es会自动发现在同一网段下的es,如果在同一网段下有多个集群,就可以用这个属性来区分不同的集群。

node.name: "Franz Kafka"

节点名,默认随机指定一个name列表中名字,该列表在es的jar包中config文件夹里name.txt文件中,其中有很多作者添加的有趣名字。

node.master: true

指定该节点是否有资格被选举成为node,默认是true,es是默认集群中的第一台机器为master,如果这台机挂了就会重新选举master。

node.data: true

指定该节点是否存储索引数据,默认为true。

index.number_of_shards: 5

设置默认索引分片个数,默认为5片。

index.number_of_replicas: 1

设置默认索引副本个数,默认为1个副本。

path.conf: /path/to/conf

设置配置文件的存储路径,默认是es根目录下的config文件夹。

path.data: /path/to/data

设置索引数据的存储路径,默认是es根目录下的data文件夹,可以设置多个存储路径,用逗号隔开,例:

path.data: /path/to/data1,/path/to/data2

path.work: /path/to/work

设置临时文件的存储路径,默认是es根目录下的work文件夹。

path.logs: /path/to/logs

设置日志文件的存储路径,默认是es根目录下的logs文件夹

path.plugins: /path/to/plugins

设置插件的存放路径,默认是es根目录下的plugins文件夹

bootstrap.mlockall: true

设置为true来锁住内存。因为当jvm开始swapping时es的效率 会降低,所以要保证它不swap,可以把ES_MIN_MEM和ES_MAX_MEM两个环境变量设置成同一个值,并且保证机器有足够的内存分配给es。 同时也要允许elasticsearch的进程可以锁住内存,linux下可以通过`ulimit -l unlimited`命令。

network.bind_host: 192.168.0.1

设置绑定的ip地址,可以是ipv4或ipv6的,默认为0.0.0.0。

network.publish_host: 192.168.0.1

设置其它节点和该节点交互的ip地址,如果不设置它会自动判断,值必须是个真实的ip地址。

network.host: 192.168.0.1

这个参数是用来同时设置bind_host和publish_host上面两个参数。

transport.tcp.port: 9300

设置节点间交互的tcp端口,默认是9300。

transport.tcp.compress: true

设置是否压缩tcp传输时的数据,默认为false,不压缩。

http.port: 9200

设置对外服务的http端口,默认为9200。

http.max_content_length: 100mb

设置内容的最大容量,默认100mb

http.enabled: false

是否使用http协议对外提供服务,默认为true,开启。

gateway.type: local

gateway的类型,默认为local即为本地文件系统,可以设置为本地文件系统,分布式文件系统,hadoop的HDFS,和amazon的s3服务器,其它文件系统的设置方法下次再详细说。

gateway.recover_after_nodes: 1

设置集群中N个节点启动时进行数据恢复,默认为1。

gateway.recover_after_time: 5m

设置初始化数据恢复进程的超时时间,默认是5分钟。

gateway.expected_nodes: 2

设置这个集群中节点的数量,默认为2,一旦这N个节点启动,就会立即进行数据恢复。

cluster.routing.allocation.node_initial_primaries_recoveries: 4

初始化数据恢复时,并发恢复线程的个数,默认为4。

cluster.routing.allocation.node_concurrent_recoveries: 2

添加删除节点或负载均衡时并发恢复线程的个数,默认为4。

indices.recovery.max_size_per_sec: 0

设置数据恢复时限制的带宽,如入100mb,默认为0,即无限制。

indices.recovery.concurrent_streams: 5

设置这个参数来限制从其它分片恢复数据时最大同时打开并发流的个数,默认为5。

discovery.zen.minimum_master_nodes: 1

设置这个参数来保证集群中的节点可以知道其它N个有master资格的节点。默认为1,对于大的集群来说,可以设置大一点的值(2-4)

discovery.zen.ping.timeout: 3s

设置集群中自动发现其它节点时ping连接超时时间,默认为3秒,对于比较差的网络环境可以高点的值来防止自动发现时出错。

discovery.zen.ping.multicast.enabled: false

设置是否打开多播发现节点,默认是true。

discovery.zen.ping.unicast.hosts: ["host1", "host2:port", "host3[portX-portY]"]

设置集群中master节点的初始列表,可以通过这些节点来自动发现新加入集群的节点。

下面是一些查询时的慢日志参数设置

index.search.slowlog.level: TRACE

index.search.slowlog.threshold.query.warn: 10s

index.search.slowlog.threshold.query.info: 5s

index.search.slowlog.threshold.query.debug: 2s

index.search.slowlog.threshold.query.trace: 500ms

index.search.slowlog.threshold.fetch.warn: 1s

index.search.slowlog.threshold.fetch.info: 800ms

index.search.slowlog.threshold.fetch.debug:500ms

index.search.slowlog.threshold.fetch.trace: 200ms


4、安装ElasticSearch-head插件,使用ElasticSearch视图界面管理ElasticSearch集群。

  1. 执行:
         cd /opt/ElasticSearch-1.7.6/bin,
        ./plugin -install mobz/ElasticSearch-head

命令完成安装。

2)进入ElasticSearch目录,执行

bin/./elasticsearch

启动ElasticSearch。

启动ElasticSearch

2)浏览器输入

http://IP:9200/_plugin/head

打开ElasticSearch视图管理界面,对ElasticSearch集群进行管理。

ElasticSearch视图管理界面

二、部署Logstash

1、官网下载Logstash源代码,并考至/opt目录下。此处选择Logstash-2.4.1版本。

2、编辑配置logstash.conf文件,配置日志采集、处理及输出所需插件。本文使用input插件监控指定目录下的日志文件,使用filter插件对日志内容进行处理,使用output插件将采集的日志信息输出到ElasticSearch集群进行管理。文件内容详解如下:


input插件使用详解:

An input plugin enables a specific source of events to be read by Logstash.

path字段:指定所需监视日志文件的路径信息,可同时设置监视多个日志文件,详细使用方法描述如下:

This is a required setting.

Value type is [array](https://www.elastic.co/guide/en/logstash/2.4/configuration-file-structure.html#array)

There is no default value for this setting.

The path(s) to the file(s) to use as an input. You can use filename patterns here, such as /var/log/*.log. If you use a pattern like /var/log/**/*.log, a recursive search of /var/log will be done for all *.log files. Paths must be absolute and cannot be relative.

You may also configure multiple paths. See an example on the [Logstash configuration page](https://www.elastic.co/guide/en/logstash/2.4/configuration-file-structure.html#array).

type字段:指定信息文档类型,详细描述如下:

Value type is [string](https://www.elastic.co/guide/en/logstash/2.4/configuration-file-structure.html#string)

There is no default value for this setting.

Add a type field to all events handled by this input.

Types are used mainly for filter activation.

The type is stored as part of the event itself, so you can also use the type to search for it in Kibana.

If you try to set a type on an event that already has one (for example when you send an event from a shipper to an indexer) then a new input will not override the existing type. A type set at the shipper stays with that event for its life even when sent to another Logstash server.

start_position字段:

Value can be any of: beginning, end

Default value is "end"

Choose where Logstash starts initially reading files: at the beginning or at the end. The default behavior treats files like live streams and thus starts at the end. If you have old data you want to import, set this to *beginning*.

This option only modifies "first contact" situations where a file is new and not seen before, i.e. files that don’t have a current position recorded in a sincedb file read by Logstash. If a file has already been seen before, this option has no effect and the position recorded in the sincedb file will be used.

codec字段:指定文档解码字符集

Value type is [codec](https://www.elastic.co/guide/en/logstash/2.4/configuration-file-structure.html#codec)

Default value is "plain"

The codec used for input data. Input codecs are a convenient method for decoding your data before it enters the input, without needing a separate filter in your Logstash pipeline.

其他插件工具的作用及使用方法可查询官方文档进行获取,

网站地址:https://www.elastic.co/guide/en/logstash/2.4/index.html

 input {

       file{

           path => "/mnt/DShare/hdfsputter_info.log"

           type => "error-log"

           start_position => "beginning"

               codec => plain {

                   charset => "GBK"

              }

           }

 }

 filter {

        grok {

          patterns_dir => "../logstash-2.4.1/grok-patterns"

          match => { "message" => "%{DROP:drop}"}

         }

         if [drop]{

             drop{}

          }

//多行处理插件,根据正则表达式中定义的字符进行匹配,并将符合匹配条件的行向前合并为一行

         multiline {

                 pattern => "Error\sparsing\sline"

                 negate => "true"

                 what => "previous"

         }

         multiline{

                 pattern => "HBasePutter.java:314"

                 negate => "true"

                 what => "previous"

         }

//grok插件中通过在grok-patterns中定义的正则表达式匹配并提取日志中对应的字段内容

         grok {

                 patterns_dir => "../logstash-2.4.1/grok-patterns"

                 break_on_match => false

                 match => [

                             "message" , ".*%{PASSTIME:passtime}.*",

                             "message" , ".*%{CARPLATE:carplate}.*",

                             "message" , ".*%{PLATECOLOR:platecolor}.*",

                             "message" , ".*%{TGSID:tgsid}.*",

                             "message" , ".*%{LOCATIONID:locationId}.*",

                             "message" , ".*%{DRIVEWAY:driveway}.*",

                             "message" , ".*%{DRIVEDIR:drivedir}.*",

                             "message" , ".*%{CAPTUREDIR:capturedir}.*",

                             "message" , ".*%{CARBRAND:carbrand}.*",

                             "message" , ".*%{CARCOLOR:carcolor}.*"

                          ]

                 remove_field => "message"

         }

 #      date {

 #              match => ["passtime","yyyy-MM-dd HH:mm:ss"]

 #              target => "passtime"

 #      }

 }

//output插件将提取的日志字段信息输出到指定的ElasticSearch集群进行存储。

 output {

         stdout{

                codec => rubydebug

         }

         elasticsearch {

             hosts => ["192.168.174.141"]

             index => "error-log"

             document_type => "error-log"

             manage_template => true

             template => "../logstash-2.4.1/errorlog-template.json"

             template_name => "error-log"

             template_overwrite => true

         }

 }

3、编辑grok_patterns文件,grok_patterns文件中定义可以匹配所需字段信息的正则表达式,Logstash运行时根据grok_patterns文件中定义的正则表达式去匹配日志文件中的内容,提取所需字段信息。根据需求,定义grok_patterns文件中定义字段匹配内容如下:

 DROP     INFO|WARN|DEBUG

  IPADDR     (?<=\bIPADDR=)\w+\b.[0-9]{1,.[0-9]{1,}.[0-9]{1,}

   SENDTIME     (?<=\bSENDTIME=)\w+\b-[0-9]{2}-[0-9]{2}\s[0-9]{2}:[0-9]{2}:[0-9]{2}\s[a-z0-9A-Z]{0,}

   PASSTIME      (?<=\bPASSTIME=)\w+\b-[0-9]{2}-[0-9]{2}\s[0-9]{2}:[0-9]{2}:[0-9]{2}

   CARPLATE      (?<=\bCARPLATE=).[0-9A-Z]{6}\b

   PLATETYPE     [0-9]{2,}

   SPEED

   PLATECOLOR     (?<=Error\sparsing\sline\s:\s\[PLATECOLOR=)\w+\b

   LOCATIONID     (?<=Error\sparsing\sline\s:\s\[LOCATIONID=)\w+\b

 DRIVEWAY     (?<=\bDRIVEWAY=)\w+\b

 DRIVEDIR     (?<=\bDRIVEDIR=)\w+\b

 CAPTUREDIR     (?<=Error\sparsing\sline\s:\s\[CAPTUREDIR=).*?(?=])

 CARCOLOR     (?<=Error\sparsing\sline\s:\s\[CARCOLOR=).*?(?=])

 CARBRAND     (?<=Error\sparsing\sline\s:\s\[CARBRAND=)\w+\b

 TGSID     (?<=\bTGSID=)\w+\b

 PLATECOORD     [0-9]{3},[0-9]{3},[0-9]{3},[0-9]{3}

 CABCOORD

 IMGID1     (?<=\bIMGID1=)\w+\b

 IMGID2    (?<=\bIMGID2=)\w+\b

 IMGID3     (?<=\bIMGID3=)\w+\b

4、启动Logstash(注意:启动Logstash前需启动ElasticSearch)。执行

bin/./logstash -f logstash.conf

启动Logstash。日志内容发生变化时,采集日志信息进行处理并输出。

启动Logstash
采集日志信息进行处理并输出

5、浏览器打开elasticsearch_head插件进入elasticsearch视图管理界面。

进入elasticsearch视图管理界面
进入elasticsearch视图管理界面

三、Java通过Spring Data Elasticsearch API访问ElasticSearch集群,查询获取所需数据

1) build.gradle文件中添加ElasticSearch、Spring Data Elasticsearch、spring boot starter data elasticsearch依赖:


compile group: 'org.elasticsearch', name: 'elasticsearch', version: '1.7.6'

compile group: 'org.springframework.data', name: 'spring-data-elasticsearch', version: '1.3.6.RELEASE'

compile group: 'org.springframework.boot', name: 'spring-boot-starter-data-elasticsearch', version: '1.3.6.RELEASE'
build.gradle文件中添加依赖

2)创建实体类,并使用 @Document(indexName = "索引名", type = "类型名")进行注解。

import org.springframework.data.annotation.Id;

import org.springframework.data.elasticsearch.annotations.Document;

/**

* Created by Grandland on 2017/12/9.

*/

@Document(indexName ="error-log", type ="error-log")

public class ErrorInfo {

}

3)创建Repository接口,查询操作ElasticSearch中的文档信息。

@Component

@Repository

public interface ErrorInfoRepository extends ElasticsearchRepository {

//定义从ElasticSearch中查询数据接口方法

}

4)controller调用Repository中方法获取ElasticSearch中数据,根据业务需求进行处理封装,返回前台进行解析显示。(代码略)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352

推荐阅读更多精彩内容