服务搭建准备工作:
1. 在linux下创建一个目录,一般默认创建在/home/**/** 路径下
例如: mkdir -p /home/gdca/elk
2. 创建一个新的用户,es默认不允许使用root用户启动
创建用户:linux: useradd elk
设置用户密码:passwd elk
给用户分配目录权限: chown elk:elk -R /home/gdca/elk
一、开始正式搭建elasticsearch服务
首先下载ES,这里我们采用的版本是7.6.0。我们进入到/home/gdca/elk目录下,下载elasticsearch7.6.0
curl -L -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.6.0-linux-x86_64.tar.gz
然后解压文件: tar -zxvf elasticsearch-7.6.0-linux-x86_64.tar.gz, 解压成功以后进入到elasticsearch-7.6.0目录中。
修改ES配置文件参数:
es的所有配置文件都在${ES_HOME}/config这个目录下,首先我们设置一下jvm参数,打开jvm.options文件
vim jvm.options
################################################################
## IMPORTANT: JVM heap size
################################################################
##
## You should always set the min and max JVM heap
## size to the same value. For example, to set
## the heap to 4 GB, set:
##
## -Xms4g
## -Xmx4g
##
## See https://www.elastic.co/guide/en/elasticsearch/reference/current/heap-size.html
## for more information
##
################################################################
# Xms represents the initial size of total heap space
# Xmx represents the maximum size of total heap space
-Xms256m
-Xmx256m
我们改一下堆内存的大小,我这里这里统一调整为256m内存,大家可以根据自己机器的内存情况进行调整。
然后,打开elasticsearch.yml文件,配置一下这里边的参数。
# ======================== Elasticsearch Configuration =========================
#
# NOTE: Elasticsearch comes with reasonable defaults for most settings.
# Before you set out to tweak and tune the configuration, make sure you
# understand what are you trying to accomplish and the consequences.
#
# The primary way of configuring a node is via this file. This template lists
# the most important settings you may want to configure for a production cluster.
#
# Please consult the documentation for further information on configuration options:
# https://www.elastic.co/guide/en/elasticsearch/reference/index.html
#
# ---------------------------------- Cluster -----------------------------------
#
# Use a descriptive name for your cluster:
#
cluster.name: cluster-a
#
# ------------------------------------ Node ------------------------------------
#
# Use a descriptive name for the node:
#
node.name: node-130
#
# Add custom attributes to the node:
#
#node.attr.rack: r1
#
# ----------------------------------- Paths ------------------------------------
#
我们先配置一下集群的名字,也就是cluster.name,在这里,我们叫做cluster-a。在另外两台机器上,集群的名字也要叫做cluster-a,这样才能够组成一个集群。在ES中,集群名字相同的节点,会组成ES集群。
然后,我们再修改node.name节点名称,这个名称是每一个节点的,所以,每个节点的名称都不能相同。这里我们以ip命名,130这台机器,节点名称就叫node-130,另外两台叫做node-131和node-132。
我们再接着看后面的配置,
# ----------------------------------- Paths ------------------------------------
#
# Path to directory where to store the data (separate multiple locations by comma):
#
#path.data: /path/to/data
#
# Path to log files:
#
#path.logs: /path/to/logs
#
# ----------------------------------- Memory -----------------------------------
#
# Lock the memory on startup:
#
#bootstrap.memory_lock: true
#
# Make sure that the heap size is set to about half the memory available
# on the system and that the owner of the process is allowed to use this
# limit.
#
# Elasticsearch performs poorly when the system is swapping the memory.
#
# ---------------------------------- Network -----------------------------------
#
# Set the bind address to a specific IP (IPv4 or IPv6):
#
network.host: 192.168.73.130
#
# Set a custom port for HTTP:
#
#http.port: 9200
路径和内存,咱们使用默认的就好,咱们重点看一下网络。我们需要指定一下ES绑定的地址,如果不设置,那么默认绑定的就是localhost,也就是127.0.0.1,这样就只有本机能够访问了,其他机器是访问不了的。所以这里我们要绑定每台机器的地址,分别是192.168.73.130,192.168.73.131,192.168.73.132。
接下来,我们看一下集群的相关配置,
# --------------------------------- Discovery ----------------------------------
#
# Pass an initial list of hosts to perform discovery when this node is started:
# The default list of hosts is ["127.0.0.1", "[::1]"]
#
discovery.seed_hosts: ["192.168.73.130", "192.168.73.131","192.168.73.132"]
#
# Bootstrap the cluster using an initial set of master-eligible nodes:
#
cluster.initial_master_nodes: ["node-130", "node-131", "node-132"]
#
# For more information, consult the discovery and cluster formation module documentation.
#
也就是Discovery这一段的配置,我们先设置一下集群中节点的地址,也就是discovery.seed_hosts这一段,我们把3台机器的ip写在这里。然后再把3台机器的节点名称写在cluster.initial_master_nodes,好了,集群的配置到这里就告一段落了。
系统配置(重点:如果不做系统配置es启动会出错,后台运行模式会无法启动)
修改limits.conf文件,我们切换到root用户,打开limits.conf文件,添加内容如下图
调整mmapfs的数值
由于ES是使用mmapfs存储索引,但是系统的默认值太低了,我们调高一点。
sysctl -w vm.max_map_count=262144
接下来就可以启动es了
./bin/elasticsearch, 如果需要后台启动运行:./bin/elasticsearch -d
好,到这里我们所有的配置就完成了,现在依次启动3个节点的ES。启动完成后,我们在浏览器中检查以下集群的状态: http://192.168.73.130:9200/_cluster/health,
{"cluster_name":"cluster-a","status":"green","timed_out":false,"number_of_nodes":3,"number_of_data_nodes":3,"active_primary_shards":0,"active_shards":0,"relocating_shards":0,"initializing_shards":0,"unassigned_shards":0,"delayed_unassigned_shards":0,"number_of_pending_tasks":0,"number_of_in_flight_fetch":0,"task_max_waiting_in_queue_millis":0,"active_shards_percent_as_number":100.0}
二、搭建Logstash服务
注意ELK服务搭建时候要选择同一个版本,不然容易出现各种莫名其妙的问题
同样进入config配置文件,配置一下jvm启动参数,这边配置也用256m的启动内存,实际需要根据具体环境扩展: vim ./config/jvm.option
同样修改一下基础配置文件: vim ./config/logstash.yml, 主要改动http_ip 从127.0.0.1 修改为 0.0.0.0 支持ip地址访问,端口号http_port:9600 使用默认参数,这边根据实际业务需要做调整,这里我采用基本默认的推荐参数。
推荐logstash 启动方式为 ./bin/logstash.sh -f 配置文件路径 & ,注意这里一定要配置conf文件,为了方便后台启动,要配置访问路径和es路径,不能直接配置stdin{}, 这样后台启动会等待命令行输入测试内容,而卡在命令行执行导致系统无法正常打开端口启动!!!重点,我这里被坑了2个多小时
Logstash配置基础:
input plugin 让logstash可以读取特定的事件源
示范:
input{
file{
path => "/var/log/nginx/access.log"
start_position => "beginning"
type => "nginx_access_log"
}
}
说明:
file 从文件读取数据
file{
path => ['/var/log/nginx/access.log'] #要输入的文件路径
type => 'nginx_access_log'
start_position => "beginning"
}
# path 可以用/var/log/*.log,/var/log/**/*.log,如果是/var/log则是/var/log/*.log
# type 通用选项. 用于激活过滤器
# start_position 选择logstash开始读取文件的位置,begining或者end。一般来说默认begining
还有一些常用的例如:discover_interval,exclude,sincedb_path,sincedb_write_interval等可以参考官网,这部分暂时没看懂等待后续补充。
syslog 通过网络将系统日志消息读取为事件
syslog{
port =>"514"
type => "syslog"
}
# port 指定监听端口(同时建立TCP/UDP的514端口的监听)
#从syslogs读取需要实现配置rsyslog:
# cat /etc/rsyslog.conf 加入一行
*.* @172.17.128.200:514 #指定日志输入到这个端口,然后logstash监听这个端口,如果有新日志输入则读取
# service rsyslog restart #重启日志服务
beats 从Elastic beats接收事件
beats {
port => 5044 #要监听的端口
}
# 还有host等选项
# 从beat读取需要先配置beat端,从beat输出到logstash。
# vim /etc/filebeat/filebeat.yml
..........
output.logstash:
hosts: ["localhost:5044"]
kafka 将 kafka topic 中的数据读取为事件
kafka{
bootstrap_servers=> "kafka01:9092,kafka02:9092,kafka03:9092"
topics => ["access_log"]
group_id => "logstash-file"
codec => "json"
}
kafka{
bootstrap_servers=> "kafka01:9092,kafka02:9092,kafka03:9092"
topics => ["weixin_log","user_log"]
codec => "json"
}
# bootstrap_servers 用于建立群集初始连接的Kafka实例的URL列表。
# topics 要订阅的主题列表,kafka topics
# group_id 消费者所属组的标识符,默认为logstash。kafka中一个主题的消息将通过相同的方式分发到Logstash的group_id# codec 通用选项,用于输入数据的编解码器。
以上是常用的一些input插件,还有很多的input插件类型,可以参考官方文档来配置。
filter plugin 过滤器插件,对事件执行中间处理。
grok 解析文本并构造 。把非结构化日志数据通过正则解析成结构化和可查询化
grok {
match => {"message"=>"^%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}$"}
}
匹配nginx日志
# 203.202.254.16 - - [22/Jun/2018:16:12:54 +0800] "GET / HTTP/1.1" 200 3700 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/601.7.7 (KHTML, like Gecko) Version/9.1.2 Safari/601.7.7"
#220.181.18.96 - - [13/Jun/2015:21:14:28 +0000] "GET /blog/geekery/xvfb-firefox.html HTTP/1.1" 200 10975 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
注意这里grok 可以有多个match匹配规则,如果前面的匹配失败可以使用后面的继续匹配。例如
grok {
match => ["message", "%{IP:clientip} - %{USER:user} \[%{HTTPDATE:raw_datetime}\] \"(?:%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion})\" (?:\"%{DATA:body}\" )?(?:\"%{DATA:cookie}\" )?%{NUMBER:response} (?:%{NUMBER:bytes:int}|-) \"%{DATA:referrer}\" \"%{DATA:agent}\" (?:(%{IP:proxy},? ?)*|-|unknown) (?:%{DATA:upstream_addr} |)%{NUMBER:request_time:float} (?:%{NUMBER:upstream_time:float}|-)"]
match => ["message", "%{IP:clientip} - %{USER:user} \[%{HTTPDATE:raw_datetime}\] \"(?:%{WORD:verb} %{URI:request} HTTP/%{NUMBER:httpversion})\" (?:\"%{DATA:body}\" )?(?:\"%{DATA:cookie}\" )?%{NUMBER:response} (?:%{NUMBER:bytes:int}|-) \"%{DATA:referrer}\" \"%{DATA:agent}\" (?:(%{IP:proxy},? ?)*|-|unknown) (?:%{DATA:upstream_addr} |)%{NUMBER:request_time:float} (?:%{NUMBER:upstream_time:float}|-)"]
}
grok 语法:%{SYNTAX:SEMANTIC} 即 %{正则:自定义字段名}
官方提供了很多正则的grok pattern可以直接使用 :https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns
grok debug工具: http://grokdebug.herokuapp.com
正则表达式调试工具: https://www.debuggex.com/
需要用到较多的正则知识,参考文档有:https://www.jb51.net/tools/zhengze.html
自定义模式:(?<字段名>the pattern)
例如: 匹配 2018/06/27 14:00:54
(?<datetime>\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d)
得到结果: "datetime": "2018/06/27 14:00:54"
output plugin 输出插件,将事件发送到特定目标。
stdout 标准输出。将事件输出到屏幕上
output{
stdout{
codec => "rubydebug"
}
}
file 将事件写入文件
file {
path => "/data/logstash/%{host}/{application}
codec => line { format => "%{message}"} }
}
kafka 将事件发送到kafka
kafka{
bootstrap_servers => "localhost:9092"
topic_id => "test_topic" #必需的设置。生成消息的主题
}
elasticseach 在es中存储日志
elasticsearch {
hosts => "localhost:9200"
index => "nginx-access-log-%{+YYYY.MM.dd}"
}
#index 事件写入的索引。可以按照日志来创建索引,以便于删旧数据和按时间来搜索日志