ELK+Beats

1、搜索引擎简介:

    索引组件:获取数据-->建立文档-->文档分析-->文档索引(倒排索引)
    搜索组件:用户搜索接口-->建立查询(将用户键入的信息转换为可处理的查询对象)-->搜索查询-->展现结果
    
    索引组件:Lucene
    搜索组件:Solr, ElasticSearch

ElasticSearch:

是一种分布式的、可重复使用的搜索和分析引擎,能够解决越来越多
的用例。作为弹性堆栈的核心,它集中地存储你的数据,这样你就可
以发现预期和发现意外情况。 

Logstash:

Logstash是一个开源的服务器端数据处理管道,它同时从多个源获取
数据,对其进行转换,然后将其发送到您最喜欢的“stash”(当然,我
们的是Elasticsearch)。

Beats:

搜集和分析日志的工具,比logstash消耗更少的资源
Beats一共分为六种:
    Filebeat:主要用于收集日志数据。
    Metricbeat:进行指标采集,主要用于监控系统和软件的性能。
    Packetbeat:过网络抓包、协议分析,对一些请求响应式的系统通信进行监控和数据收集,可以收集到很多常规方式无法收集到的信息。           
        Winlogbeat:针对windows日志搜集      
        Heartbeat:系统间连通性检测,比如 icmp, tcp, http 等系统的连通性监控。

Kibana:

      把搜索得到的结果可视化   

2.安装

主机版本centos7,ELK6版本的安装包,ELK、Beats版本最好一致
主机:

三台机器都安装Elasticsearch,创建集群
10.10.10.1  Elasticsearch(master),Kibana
10.10.10.2  Elasticsearch(data-node1),logstash
10.10.10.3  Elasticsearch(data-node2),Beats

3、ElasticSearch的程序环境:

安装:

rpm -ivh elasticsearch-x.x.x.prm
配置文件:
    /etc/elasticsearch/elasticsearch.yml  主配置文件
    /etc/elasticsearch/jvm.options   jvm参数配置文件
    /etc/elasticsearch/log4j2.properties   日志配置文件
程序文件:
    /usr/share/elasticsearch/bin/elasticsearch
    /usr/share/elasticsearch/bin/elasticsearch-keystore:

端口:

搜索服务:9200/tcp
集群服务:9300/tcp

修改配置文件/etc/elasticsearch/elasticsearch.yml:

cluster.name: myelk #集群的名字,必须一致
node.name: mater-node  #这个节点的名字
path.data: /data/els/data # 数据存储位置
path.logs: /data/els/logs #日志存储位置
network.host: 0.0.0.0 #监听地址,0.0.0.0监听所有
http.port: 9200   #对外开放的端口
discovery.zen.ping.unicast.hosts: ["node1", "node2", "node3"] #主节点的初始列表,当主节点启动时会探测其他节点      

启动elasticsearch:

systemctl start elasticsearch

查看进程和端口:

ps -aux|grep elasticsearch
netstat -lntp 

RESTful API:

curl  -X<VERB> '<PROTOCOL>://<HOST>:<PORT>/<PATH>?<QUERY_STRING>' -d '<BODY>'
    <BODY>:json格式的请求主体;
    <VERB>:GET,POST,PUT,DELETE
    <PATH>:/index_name/type/Document_ID/
            特殊PATH:/_cat, /_search, /_cluster 
 
           /_search:搜索所有的索引和类型;
            /INDEX_NAME/_search:搜索指定的单个索引;
            /INDEX1,INDEX2/_search:搜索指定的多个索引;
            /s*/_search:搜索所有以s开头的索引;
            /INDEX_NAME/TYPE_NAME/_search:搜索指定的单个索引的指定类型;


curl -XGET 'http://10.1.0.67:9200/_cluster/health?pretty=true'
curl -XGET 'http://10.1.0.67:9200/_cluster/stats?pretty=true'   
curl -XGET 'http://10.1.0.67:9200/_cat/nodes?pretty'
curl -XGET 'http://10.1.0.67:9200/_cat/health?pretty'
curl -XGET 'http://10.1.0.67:9200/_cat/indices?v'   

6、logstash

安装logstash同上
logstash配置:

主配置文件时logstash.yml
path.data: /var/lib/logstash
http.host: "10.10.10.2"
http.port: 9600
path.logs: /var/log/logstash

处理具体日志文件,配置在/etc/logstash/conf.d目录下,并以.conf结尾

        input { 输入
            ...
        }
        
        filter{ 过滤
            ...
        }
        
        output { 输出
            ...
        }
        

简单示例配置:

示例1:
       input {
            stdin {}
       }
      output {
            stdout {
            codec => rubydebug 显示在当前屏幕上
            }
       }

示例2:从文件输入数据,经grok过滤器插件过滤之后输出至标准输出:

            input {
                file {
                    path => ["/var/log/httpd/access_log"]
                    start_position => "beginning" 从头开始读取
                }
            }

            filter {
                grok {
                    match => {
                        "message" => "%{COMBINEDAPACHELOG}"
                    }
                    remove_field: "message"
                }
            }

            output {
                stdout {
                    codec => rubydebug
                }
            }

示例3:date filter插件示例:

filter {
          grok {
              match => {  "message" => "%{HTTPD_COMBINEDLOG}"
                   }
          remove_field => "message"
          } 
         date {
               match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
               remove_field => "timestamp"
                }
         }              

示例4:mutate filter插件

          filter {
                   grok {
                           match => {
                                   "message" => "%{HTTPD_COMBINEDLOG}"
                           }
                   }
                   date {
                           match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
                   }
                   mutate {
                           rename => {
                                   "agent" => "user_agent"
                           }
                   }
           } 

示例5:geoip插件

   filter {
         grok {
               match => { "message" => "%{HTTPD_COMBINEDLOG}"
                }
             }
          date {
               match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
                  }
          mutate {
               rename => { "agent" => "user_agent"
                  }
               }
           geoip {
                 source => "clientip"
                  target => "geoip"
                  database => "/etc/logstash/maxmind/GeoLite2-City.mmdb"
                  }
            }            

示例6:使用Redis

(1) 从redis加载数据
                input {
                    redis {
                        batch_count => 1
                        data_type => "list"
                        key => "logstash-list"
                        host => "192.168.0.2"
                        port => 6379
                        threads => 5
                    }
                } 
            
(2) 将数据存入redis
                output {
                    redis {
                        data_type => "channel"
                        key => "logstash-%{+yyyy.MM.dd}"
                    }
                } 

示例7:将数据写入els cluster

output {
                elasticsearch {
                    hosts => ["http://node1:9200/","http://node2:9200/","http://node3:9200/"]
                    user => "ec18487808b6908009d3"
                    password => "efcec6a1e0"
                    index => "logstash-%{+YYYY.MM.dd}"
                    document_type => "apache_logs"
                }
            }        

示例8:综合示例,启用geoip

 input {
         beats {
             port => 5044
                 }
         }

filter {
          grok {
               match => { 
                    "message" => "%{COMBINEDAPACHELOG}"
                    }
                    remove_field => "message"
                }
                geoip {
                    source => "clientip"
                    target => "geoip"
                    database => "/etc/logstash/GeoLite2-City.mmdb"
                }
            }
output {
              elasticsearch {
               hosts => ["http://172.16.0.67:9200","http://172.16.0.68:9200","http://172.16.0.69:9200"]
                index => "logstash-%{+YYYY.MM.dd}"
                action => "index"
                document_type => "apache_logs"
                }
            }        

grok:

        %{SYNTAX:SEMANTIC}
            SYNTAX:预定义的模式名称;
            SEMANTIC:给模式匹配到的文本所定义的键名;
            
            1.2.3.4 GET /logo.jpg  203 0.12
            %{IP:clientip} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}
            
            { clientip: 1.2.3.4, method: GET, request: /logo.jpg, bytes: 203, duration: 0.12}
            
            
            %{IPORHOST:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|-)" %{HOST:domain} %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:agent} "(%{WORD:x_forword}|-)" (%{URIHOST:upstream_host}|-) %{NUMBER:upstream_response} (%{WORD:upstream_cache_status}|-) %{QS:upstream_content_type} (%{BASE16FLOAT:upstream_response_time}) > (%{BASE16FLOAT:request_time})
            
             "message" => "%{IPORHOST:clientip} \[%{HTTPDATE:time}\] \"%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:http_status_code} %{NUMBER:bytes} \"(?<http_referer>\S+)\" \"(?<http_user_agent>\S+)\" \"(?<http_x_forwarded_for>\S+)\""
             
             filter {
                grok {
                    match => {
                        "message" => "%{IPORHOST:clientip} \[%{HTTPDATE:time}\] \"%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:http_status_code} %{NUMBER:bytes} \"(?<http_referer>\S+)\" \"(?<http_user_agent>\S+)\" \"(?<http_x_forwarded_for>\S+)\""
                    }
                    remote_field: message
                }   
            }
            
            nginx.remote.ip
            [nginx][remote][ip] 
            
            
            filter {
                grok {
                    match => { "message" => ["%{IPORHOST:[nginx][access][remote_ip]} - %{DATA:[nginx][access][user_name]} \[%{HTTPDATE:[nginx
                    ][access][time]}\] \"%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}\
                    " %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} \"%{DATA:[nginx][access][referrer]}\" \"
                    %{DATA:[nginx][access][agent]}\""] }
                    remove_field => "message"
                }  
                date {
                    match => [ "[nginx][access][time]", "dd/MMM/YYYY:H:m:s Z" ]
                    remove_field => "[nginx][access][time]"
                }  
                useragent {
                    source => "[nginx][access][agent]"
                    target => "[nginx][access][user_agent]"
                    remove_field => "[nginx][access][agent]"
                }  
                geoip {
                    source => "[nginx][access][remote_ip]"
                    target => "geoip"
                    database => "/etc/logstash/GeoLite2-City.mmdb"
                }  
                                                                
            }   
            
            output {                                                                                                     
                elasticsearch {                                                                                      
                    hosts => ["node1:9200","node2:9200","node3:9200"]                                            
                    index => "logstash-ngxaccesslog-%{+YYYY.MM.dd}"                                              
                }                                                                                                    
            }
            
            注意:
                1、输出的日志文件名必须以“logstash-”开头,方可将geoip.location的type自动设定为"geo_point";
                2、target => "geoip"
            
    除了使用grok filter plugin实现日志输出json化之外,还可以直接配置服务输出为json格式;
            
            
    示例:使用grok结构化nginx访问日志 
        filter {
                grok {
                        match => {
                                "message" => "%{HTTPD_COMBINEDLOG} \"%{DATA:realclient}\""
                        }
                        remove_field => "message"
                }
                date {
                        match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
                        remove_field => "timestamp"
                }
        }            
            
    示例:使用grok结构化tomcat访问日志 
        filter {
                grok {
                        match => {
                                "message" => "%{HTTPD_COMMONLOG}"
                        }
                        remove_field => "message"
                }
                date {
                        match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
                        remove_field => "timestamp"
                }

8、 Nginx日志Json化:

        log_format   json  '{"@timestamp":"$time_iso8601",'
                    '"@source":"$server_addr",'
                    '"@nginx_fields":{'
                        '"client":"$remote_addr",'
                        '"size":$body_bytes_sent,'
                        '"responsetime":"$request_time",'
                        '"upstreamtime":"$upstream_response_time",'
                        '"upstreamaddr":"$upstream_addr",'
                        '"request_method":"$request_method",'
                        '"domain":"$host",'
                        '"url":"$uri",'
                        '"http_user_agent":"$http_user_agent",'
                        '"status":$status,'
                        '"x_forwarded_for":"$http_x_forwarded_for"'
                    '}'
                '}';

        access_log  logs/access.log  json;              
Conditionals
    Sometimes you only want to filter or output an event under certain conditions. For that, you can use a conditional.

    Conditionals in Logstash look and act the same way they do in programming languages. Conditionals support if, else if and else statements and can be nested.
    
    The conditional syntax is:

        if EXPRESSION {
        ...
        } else if EXPRESSION {
        ...
        } else {
        ...
        }    
        
        What’s an expression? Comparison tests, boolean logic, and so on!

        You can use the following comparison operators:

        equality: ==, !=, <, >, <=, >=
        regexp: =~, !~ (checks a pattern on the right against a string value on the left) inclusion: in, not in
        
        The supported boolean operators are:

            and, or, nand, xor
        
        The supported unary operators are:

            !
        Expressions can be long and complex. Expressions can contain other expressions, you can negate expressions with !, and you can group them with parentheses (...).
        
        filter {
        
            if [type] == 'tomcat-accesslog' {
                grok {}
            }
            
            if [type] == 'httpd-accesslog' {
                grok {}
            }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,734评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,931评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,133评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,532评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,585评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,462评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,262评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,153评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,587评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,792评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,919评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,635评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,237评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,855评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,983评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,048评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,864评论 2 354

推荐阅读更多精彩内容

  • 搜索引擎是什么? 搜索引擎是指根据一定的策略、运用特定的计算机程序从互联网上搜集信息,在对信息进行组织和处理后,为...
    欢醉阅读 1,270评论 4 10
  • Solr&ElasticSearch原理及应用 一、综述 搜索 http://baike.baidu.com/it...
    楼外楼V阅读 7,291评论 1 17
  • 嗬,时间过得真快,今天豆田已经满月了。 按理说,我这种仪式感特别强的人,在这种不一样的日子里,总该发表个激情澎湃的...
    四小姐的家阅读 839评论 0 50
  • 星姐摇了摇头,无奈道:“小鱼,你心也太软了。好吧好吧,看在你的面子上。” “谢谢星姐。星姐真是好人。”杨小鱼甜甜的...
    飄雲阅读 203评论 0 1
  • 翻译自:https://blog.markdaws.net 相关工程地址:https://github.com/m...
    AShawn阅读 4,901评论 2 1