Elastic(ELK)安装

安装JDK1.8版本

yum install java-1.8.0-openjdk -y

写入/etc/profile文件

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.242.b08-0.el7_7.x86_64/jre
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin

生效变量

source /etc/profile

安装Elasticearch

官方下载地址6.8.6版本

解压文件
tar -zxf elasticsearch-6.8.6.tar.gz
进入目录
cd elasticsearch-6.8.6/

关闭swap

swapoff -a

更改内存锁定大小

vim config/jvm.options

-Xms2g
-Xmx2g

内存锁定是为了让elasticsearch有足够的内存使用,内存锁定的设定为”内存的一半“,当内存大于64G时应当设置低于32G,不然会使系统指针变长。浪费内存

更改elasticsearch配置文件

vim config/elasticsearch.yml 

node.name: master
bootstrap.memory_lock: true
network.host: 10.0.0.16

node.name主机在elasticsearch中所起名称
bootstrap.memory_lock锁内存
network.host是对外提供服务IP,也可以是0.0.0.0
但节点只需要设置这些就可以,如果想设置多节点的。可以参考我的另一篇ansible搭建的文章:https://www.jianshu.com/p/3af7b723f678

针对于服务器的修改

Iptabales优化

vim /etc/sysctl.conf

net.ipv4.ip_forward = 0
net.ipv4.conf.default.rp_filter = 1
net.ipv4.conf.default.accept_source_route = 0
kernel.sysrq = 0
kernel.core_uses_pid = 1
net.ipv4.tcp_syncookies = 1
kernel.msgmnb = 65536
kernel.msgmax = 65536
kernel.shmmax = 68719476736
kernel.shmall = 5294967296
vm.max_map_count = 655360
vm.swappiness = 1

生效规则

sysctl -p

更改limits

vim /etc/security/limits.conf

* soft nofile 65536
* hard nofile 131072
* soft nproc unlimited
* hard nproc unlimited
elas soft memlock unlimited
elas hard memlock unlimited

创建用户

因为安全问题elasticsearch 不让用root用户直接运行,所以要创建新用户,并授予权限。

useradd elas

[root@localhost elasticsearch-6.8.6]# pwd
/application/elasticsearch-6.8.6

chown -R elas .

启动Elasticsearch

su elas

nohup ./bin/elasticsearch &

使用'nohup'配合'&'让其后台运行,并将日志输出到当前文件夹下的nohup.out文件中
使用'netstat -lntp'查看是否有9200和9300端口开启。

测试Elasticearch是否开启

curl http://你服务器的IP:9200

{
  "name" : "master",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "YN-Hq9grRcCiX-X4KD2GhQ",
  "version" : {
    "number" : "6.8.6",
    "build_flavor" : "default",
    "build_type" : "tar",
    "build_hash" : "3d9f765",
    "build_date" : "2019-12-13T17:11:52.013738Z",
    "build_snapshot" : false,
    "lucene_version" : "7.7.2",
    "minimum_wire_compatibility_version" : "5.6.0",
    "minimum_index_compatibility_version" : "5.0.0"
  },
  "tagline" : "You Know, for Search"
}

返回这些代码证明你的Elasticearch正常

安装Kibana

Kibana官方下载地址6.8.6版本

解压

tar -zxf kibana-6.8.6-linux-x86_64.tar.gz

cd kibana-6.8.6-linux-x86_64/

更改Kibana配置文件

vim config/kibana.yml

server.port: 5601
server.host: "10.0.0.16"
elasticsearch.hosts: ["http://10.0.0.16:9200"]

启动Kibana

nohup ./bin/kibana & 

netstat -lntp

Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name    
tcp        0      0 10.0.0.16:5601          0.0.0.0:*               LISTEN      19137/./bin/../node 

安装Mentricbeat

官方下载Mentricbeat地址

rpm -vi metricbeat-6.8.6-x86_64.rpm

从Metricbeat安装目录中,启用system模块

metricbeat modules enable system

设置初始环境

metricbeat setup -e

启动Metricbeat

service metricbeat start

安装Logstash

官方下载Logstash6.8.6版本

tar -zxf logstash-6.8.6.tar.gz
cd logstash-6.8.6/

安装Grok过滤器插件

bin/logstash-plugin install logstash-filter-grok

Validating logstash-filter-grok
Installing logstash-filter-grok
Installation successful

安装Filebeat

官方下载地址6.8.6版本

tar -zxf filebeat-6.8.6-linux-x86_64.tar.gz

cd filebeat-6.8.6-linux-x86_64/

Filebeat测试使用

在本部分中,您将创建一个Logstash管道,该管道使用Filebeat来获取Apache Web日志作为输入,解析这些日志以从日志中创建特定的命名字段,并将解析的数据写入Elasticsearch集群。与其在命令行中定义管道配置,不如在配置文件中定义管道。

首先,请转到此处下载此示例中使用的样本数据集。解压缩文件。

创建文件夹
cd ..

mkdir log_demo

cd log_demo

使用windows解压、上传
rz -E

[root@localhost log_demo]# ls
logstash-tutorial-dataset

cd /application/filebeat-6.8.6-linux-x86_64

拷贝配置文件

cp filebeat.yml filebeat_bak

清空文件,并写入

filebeat.prospectors:
- type: log
  paths:
    - /application/log_demo/logstash-tutorial-dataset

output.logstash:
  hosts: ["10.0.0.16:5044"]

使用以下命令运行Filebeat

./filebeat -e -c filebeat.yml -d "publish"

在logstash安装目录下新建一个文件first-pipeline.conf

[root@localhost logstash-6.8.6]# pwd
/application/logstash-6.8.6

vim first-pipeline.conf

input {
    beats {
        port => "5044"
    }
}

output {
    stdout { codec => rubydebug }
}

要验证的配置,运行以下命令

bin/logstash -f first-pipeline.conf --config.test_and_exit

Sending Logstash logs to /application/logstash-6.8.6/logs which is now configured via log4j2.properties
[2020-02-28T11:12:18,351][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.queue", :path=>"/application/logstash-6.8.6/data/queue"}
[2020-02-28T11:12:18,369][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.dead_letter_queue", :path=>"/application/logstash-6.8.6/data/dead_letter_queue"}
[2020-02-28T11:12:18,701][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified

--config.test_and_exit选项的意思是解析配置文件并报告任何错误

如果配置文件通过了配置测试,使用以下命令启动Logstash

bin/logstash -f first-pipeline.conf --config.reload.automatic

如果管道正常运行,则应该在控制台上看到一系列类似以下的事件

{
      "@version" => "1",
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
       "message" => "86.1.76.62 - - [04/Jan/2015:05:30:37 +0000] \"GET /style2.css HTTP/1.1\" :24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",
    "prospector" => {
        "type" => "log"
    },
        "source" => "/application/log_demo/logstash-tutorial-dataset",
        "offset" => 24248,
         "input" => {
        "type" => "log"
    },
    "@timestamp" => 2020-02-28T03:00:09.859Z,
          "beat" => {
         "version" => "6.8.6",
            "name" => "localhost.localdomain",
        "hostname" => "localhost.localdomain"
    },
          "host" => {
        "name" => "localhost.localdomain"
    },
           "log" => {
        "file" => {
            "path" => "/application/log_demo/logstash-tutorial-dataset"
        }
    }
}

使用Grok过滤器插件解析Web日志

现在,您有了一个工作管道,该管道从Filebeat中读取日志行。但是,您会注意到日志消息的格式不是理想的。您想解析日志消息以从日志中创建特定的命名字段。为此,您将使用grok过滤器插件。

grok过滤器插件是几个插件,默认情况下在Logstash可用之一。有关如何管理Logstash插件的详细信息,请参阅插件管理器的参考文档

使用grok过滤器插件,您可以将非结构化日志数据解析为结构化和可查询的内容。

由于grok过滤器插件会在传入的日志数据中查找模式,因此配置插件需要您就如何识别用例感兴趣的模式做出决策。Web服务器日志示例中的代表行如下所示:

83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png
HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel
Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"

该行开头的IP地址很容易识别,括号中的时间戳也很容易识别。要解析数据,可以使用%{COMBINEDAPACHELOG}grok模式,该模式使用以下模式从Apache日志构造行:


logstash信息.jpg

官网.jpg

编辑first-pipeline.conf文件,将整个filter部分替换为以下文字

vim first-pipeline.conf

input {
    beats {
        port => "5044"
    }
}

filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
}

output {
    stdout { codec => rubydebug }
}

删除Filebeat注册表文件

保存您的更改。由于您已启用自动配置重新加载,因此无需重新启动Logstash即可获取更改。但是,您确实需要强制Filebeat从头读取日志文件。为此,请转到运行Filebeat的终端窗口,然后按Ctrl + C关闭Filebeat。然后删除Filebeat注册表文件。例如,运行:

pwd
/application/filebeat-6.8.6-linux-x86_64

rm -f data/registry

由于Filebeat将其收获的每个文件的状态存储在注册表中,因此删除注册表文件将强制Filebeat从头读取其收获的所有文件。

接下来,使用以下命令重新启动Filebeat:

./filebeat -e -c filebeat.yml -d "publish"

如果Filebeat需要等待Logstash重新加载配置文件,则在Filebeat开始处理事件之前可能会稍有延迟。

再次启动Logstash

在Logstash应用grok模式之后,事件将具有以下JSON表示形式

{
           "host" => {
        "name" => "localhost.localdomain"
    },
           "auth" => "-",
     "@timestamp" => 2020-02-28T05:32:04.645Z,
            "log" => {
        "file" => {
            "path" => "/application/log_demo/logstash-tutorial-dataset"
        }
    },
         "offset" => 24248,
           "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
        "message" => "86.1.76.62 - - [04/Jan/2015:05:30:37 +0000] \"GET /style2.css HTTP/1.1\" 200 4877 \"http://www.semicomplete.com/projects/xdotool/\" \"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",
          "ident" => "-",
       "clientip" => "86.1.76.62",
     "prospector" => {
        "type" => "log"
    },
       "response" => "200",
           "verb" => "GET",
    "httpversion" => "1.1",
       "@version" => "1",
      "timestamp" => "04/Jan/2015:05:30:37 +0000",
          "input" => {
        "type" => "log"
    },
       "referrer" => "\"http://www.semicomplete.com/projects/xdotool/\"",
          "agent" => "\"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",
         "source" => "/application/log_demo/logstash-tutorial-dataset",
          "bytes" => "4877",
           "beat" => {
            "name" => "localhost.localdomain",
         "version" => "6.8.6",
        "hostname" => "localhost.localdomain"
    },
        "request" => "/style2.css"
}

使用Geoip过滤器插件增强数据

除了解析日志数据以获得更好的搜索之外,filter插件还可以从现有数据中获得补充信息。例如,geoip插件查找IP地址,从地址派生地理位置信息,并将该位置信息添加到日志中

配置Logstash实例来使用geoip过滤器插件,将以下行添加到第一个管道文件的过滤器部分:

vim first-pipeline.conf

input {
    beats {
        port => "5044"
    }
}

filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
    geoip {
        source => "clientip"
    }
}

output {
    stdout { codec => rubydebug }
}

保存更改。要强制Filebeat从头开始读取日志文件,如前所述,请关闭Filebeat(按Ctrl+C),删除注册表文件,然后使用以下命令重新启动Filebeat:

rm -f data/registry

./filebeat -e -c filebeat.yml -d "publish"

再次查看Logstash,会发现多了地理位置信息:

{
         "offset" => 24248,
          "geoip" => {
           "postal_code" => "SW12",
             "city_name" => "Balham",
              "latitude" => 51.4434,
                    "ip" => "86.1.76.62",
              "timezone" => "Europe/London",
        "continent_code" => "EU",
             "longitude" => -0.1468,
         "country_code2" => "GB",
              "location" => {
            "lat" => 51.4434,
            "lon" => -0.1468
        },
          "country_name" => "United Kingdom",
           "region_code" => "LBH",
         "country_code3" => "GB",
           "region_name" => "Lambeth"
    },
           "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
    "httpversion" => "1.1",
           "verb" => "GET",
      "timestamp" => "04/Jan/2015:05:30:37 +0000",
           "host" => {
        "name" => "localhost.localdomain"
    },
       "clientip" => "86.1.76.62",
       "response" => "200",
       "@version" => "1",
          "bytes" => "4877",
        "request" => "/style2.css",
        "message" => "86.1.76.62 - - [04/Jan/2015:05:30:37 +0000] \"GET /style2.css HTTP/1.1\" 200 4877 \"http://www.semicomplete.com/projects/xdotool/\" \"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",
           "auth" => "-",
            "log" => {
        "file" => {
            "path" => "/application/log_demo/logstash-tutorial-dataset"
        }
    },
          "input" => {
        "type" => "log"
    },
       "referrer" => "\"http://www.semicomplete.com/projects/xdotool/\"",
           "beat" => {
        "hostname" => "localhost.localdomain",
         "version" => "6.8.6",
            "name" => "localhost.localdomain"
    },
         "source" => "/application/log_demo/logstash-tutorial-dataset",
          "ident" => "-",
     "prospector" => {
        "type" => "log"
    },
     "@timestamp" => 2020-02-28T05:51:14.212Z,
          "agent" => "\"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\""
}

索引你的数据到Elasticsearch

现在web日志被分解为特定的字段,你可以将数据放入到Elasticsearch中了

Logstash管道可以将数据索引到Elasticsearch集群中。编辑first-pipeline.conf文件,将整个输出部分替换为以下文字:

input {
    beats {
        port => "5044"
    }
}

filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
    geoip {
        source => "clientip"
    }
}

output {
    elasticsearch {
        hosts => ["10.0.0.16:9200"]
    }
}   

重启Filebeat

rm -f data/registry

./filebeat -e -c filebeat.yml -d "publish"

到这里基本就可用了,大家等率自己的Kibana可以看到数据了。只要把目录文件改为你需要监听的文件夹,再配合rsync就可以简单使用了。接下来我会做个集群的Elastic。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容