ELK
[TOC]
Elasticsearch
下载
curl -O https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.5.2.tar.gz
安装
mv elasticsearch-1.5.2.tar.gz /app
cd /app
tar xf elasticsearch-1.5.2.tar.gz
cd elasticsearch-1.5.2
后台运行与测试
bin/elasticsearch -d
ss -ntal # 9200 和 9300
curl 'http://localhost:9200/?pretty'
关闭
curl -XPOST 'http://localhost:9200/_shutdown'
配置
配置文件
配置文件的格式是 YAML 格式
/app/elasticsearch-1.5.2/config/elasticsearch.yml # 用于配置不同模块的属性,如:网络地址、路径
/app/elasticsearch-1.5.2/config/logging.yml # 用于配置自身的日志记录选项
安装插件
elasticsearch-head 是一个简单的 elasticsearch web 图形化的管理工具
使用javascript写的一个弹性搜索集群的Web前端,在 elasticsearch5.x 后不再支持
插件形式的安装,需要作为一个独立的服务安装和启动。
- 下载
可以在 github 中搜索, elasticsearch-head
之后点击 mobz/elasticsearch-head,找到后选择下载安装包,或者克隆都行
git clone git@github.com:mobz/elasticsearch-head.git
- 安装
安装需要使用 npm 命令安装,centos 用 yum 安装 npm 即可
mac 安装 node 后就会自动带 npm 命令
yum install npm
cd elasticsearch-head
npm installl
- 启动
npm run start
上面的方式我测试没有成功,原因可能是网络问题导致,访问的都是国外的网站,慢。
推荐下面的方式
浏览器访问: http://localhost:9100/
还有一种安装方式非常简单,只需要在 chrome 浏览器中安装 head 插件即可
在 谷歌商店 搜索 "ElasticSearch Head" 插件,安装这个插件,之后点击 相应插件图标即可。
插件安装地址
https://chrome.google.com/webstore/detail/elasticsearch-head/ffmkiejjmecolpfloofpjologoblkegm
之后,在浏览地址栏里输入 elasticsearch 的地址和端口,点击连接
安装完毕
Filebeat
下载
curl -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-5.6.3-linux-x86_64.tar.gz
安装
# tar xf filebeat-5.6.3-linux-x86_64.tar.gz -C /app/
# cd /app
# ln -s filebeat-5.6.3-linux-x86_64 filebeat
配置 Filebeat
- 配置文件名
filebeat.yml
== 该配置文件的所有者必须是root或正在执行Beat进程的用户。 该文件的权限必须不允许除所有者以外的任何人写入,需要是0644的权限==
- 配置输入源 input
filebeat.prospectors:
- input_type: log
paths:
- /var/log/*.log
==注意:路径的最后一个不能用完全用 * 代替所有,必须有一个相对的匹配或者绝对的匹配。==
==/var/log/* 这样的匹配是错误的==
# 相对匹配
/path/to/*.log
/path/to/www*
/path/to/*2017*
# 绝对匹配
/path/to/www.xxx.com_2017.11.08.log
- 假如你想匹配子目录可以使用下面的模式
filebeat.prospectors:
- input_type: log
paths:
- /var/log/*/*.log
==同样的,上面的方式,只会匹配到 /var/log 目录下的每个子目录下所有以 .log 结尾的所有日志文件,但是不会匹配到 /var/log 目录下的任何日志文件。==
- 配置将日志2输入到 elasticsearch
#-------------------------- Elasticsearch output ------------------------------
output.elasticsearch:
# Array of hosts to connect to.
hosts: ["172.16.153.129:9200"]
- 配置 output 到 logstash
#----------------------------- Logstash output --------------------------------
output.logstash:
hosts: ["127.0.0.1:5044"]
- 检查你的配置文件语法,在 filebeat 安装目录下执行下面的命令
./filebeat.sh -configtest -e.
配置加载的模板
- 自动加载模板
默认情况下,Filebeat 会自动加载推荐的模板文件 filebeat.template.json,如果启用了 Elasticsearch 输出。您可以通过调整 filebeat.yml 文件中的选项template.name和template.path选项来配置filebeat以加载其他模板:
output.elasticsearch:
hosts:[“localhost:9200”]
template.name:“filebeat”
template.path:“filebeat.template.json”
template.overwrite:false
==默认情况下,如果索引中已存在模板,则不会覆盖该模板。要覆盖现有的模板,请template.overwrite: true在配置文件中进行设置。==
要禁用自动模板加载,请注释Elasticsearch输出下的 template 的相关部分。
==如果使用Logstash输出,则不支持自动加载模板的选项,需要手动加载模板。==
- 手动加载模板
假如配置了输出到 Logstash 中,需要手动添加索引模板到 Elasticsearch 中,就需要在 filebeat 的安装目录中执行如下命令
curl -H 'Content-Type: application/json' -XPUT 'http://172.16.153.129:9200/_template/filebeat' -d@filebeat.template.json
# 下面是返回结果, true 证明成功,上面的 172.16.153.129:9200
# 是 Elasticsearch 中设置的监听地址和端口
{"acknowledged":true}
假如你之前已经用了输出到 elasticsearch 的索引模板,可以在成功执行上面命令后,执行下面的命令,用于删除旧的索引模板,这样可以强迫 Kibana 重新使用新的索引模板
curl -XDELETE 'http://172.16.153.129:9200/filebeat-*'
检查你的配置
./filebeat -configtest
后台启动
setsid ./filebeat -e -c filebeat.yml -d “publish”
要在 Kibana 中查看数据,假如首次使用 kibana,需要先创建索引模式,具体请看 Kibana 部分的介绍
创建索引模式后,您可以 filebeat-* 在 Kibana 中选择索引模式来探索 Filebeat数据。
- 关闭
ps -ef |grep filebeat # 找到进程号,kill -9 进程号即可
Logstash
数据在线程之间以 事件 的形式流传。不要叫行,因为 logstash 可以处理多行事件
下载
curl -O http://download.elastic.co/logstash/logstash/logstash-1.5.0.tar.gz
安装
tar -xf logstash-1.5.0.tar.gz
cd logstash-1.5.0
bin/logstash -e 'input {stdin {} } output { stdout { codec => rubydebug } }'
- 上面的启动方式是,用 -e 参数指定以标准输入和标准输出的方式来运行 Logstash
测试
此时可以在命令行里输入一下信息,应该可以看到一些输出
# bin/logstash -e 'input {stdin {} } output { stdout { codec => rubydebug } }'
Logstash startup completed
Hello PackPUb # 输入的信息
{ # 以下为输出信息
"message" => "Hello PackPUb",
"@version" => "1",
"@timestamp" => "2017-10-22T19:57:50.698Z", # 事件被索引的时间
"host" => "0.0.0.0"
}
message => 完整的输入信息或者事件。
@timestamp => 事件被索引的时间。 如果使用了日期过滤插件,也可以是 message 中的某个指定事件时间的字段
host => 表示事件发生的主机
文件输入插件
可以利用文件输入插件,Logstash 可以很容易配置成读取日志文件作为输入源
例如 读取 Apache 日志文件作为输入,然后输出到标准输出。配置文件如下:
# bin/logstash -e 'input { file { type=>"django" path=>"/app/ansible/debug/all.log" } } output { stdout { codec => rubydebug } }'
输出插件
配置为将所有的输入输出到一个 Elasticsearch 实例中. 这是在 ELK 平台中是最常用的配置。
这里是把一个Django 的日志作为输入,之后输出到 elasticsearch
bin/logstash -e 'input { file { type=>"log" path=>"/app/ansible/debug/all.log" } } output { elasticsearch { host => localhost } }'
Logstash 插件
列出插件
- 列出所用插件
bin/logstash-plugin list
# 输出
logstash-filter-cidr
logstash-filter-clone
logstash-filter-csv
logstash-filter-date
...
- 列出插件名中包含特定模式的插件
bin/logstash-plugin list logstash-filter-cidr
好像没啥卵用
- 列出指定分组插件
分组可以是: input/output/filter/codec
bin/logstash-plugin list --group output
插件类型
插件类型包括:
- 输入插件 (input)
- 过滤插件 (filter)
- 输出插件 (output)
- 编解码器 (codec)
下面只介绍生产环境中经常使用的插件和插件属性
输入插件
文件 (file)
Logstash 可以从 file 读取数据
file 插件可用于将事件和日志文件输入给 Logstsh
input {
file {
path => "/path/to/logsfiles"
}
}
标准输入(stdin)
stdin 输入插件用于从标准输入中读取事件和日志。
input {
stdin {
}
}
当我们配置为 stdin 时,终端上的任何输入都会作为 Logstash 日志管道的输入。
这通常用于测试。
filebeat (beats)
filebeat 插件出现在最新的 5.x 版本中,它是一个轻量级的事件和日志搜集工具。
input {
beats {
port => "5044"
}
}
输出插件
- stdout 标准输出
会把处理过的数据输出到屏幕终端
output {
stdout {
codec => rubydebug
}
}
- elasticsearch
把处理过的数据存储到 elasticsearch 集群中
output {
elasticsearch {
hosts => ["20.200.21.21:9200"]
codec => "json"
}
}
Logstash 插件的属性的数据类型
数组
python => ["value1","value2"]
布尔值
true 和 false
periodic_flush => false
编解码器(Codec)
编辑码器实际上并不是一种数据类型,它是在输入或输出的时候对数据进行解码或编码
的一种方式
例子:
在输出的时候设置,把输出数据编码成 JSON 格式
codec => "json"
哈希 (Hash)
就是由一系列键值对组合成的集合。
以 "key" => "value" 的形式表示,多个用空格分开
match => {
"key1" => "value1" "key1" => "value2"
}
字符串
用双引号引起来的字符序列
value => "Welcome to ELK"
注释(Comment)
以字符 # 开头
# 我是注释
字段引用
字段可以使用 [field_name] 的方式引用
嵌套字段 [field1][field2]
支持下标索引
小贴士:logstash 的数组也支持倒序下标,即 [geoip][location][-1] 可以获取数组最后一个元素的值。
Logstash 还支持变量内插,在字符串里使用字段引用的方法是这样:
"the longitude is %{[geoip][location][0]}"
Logstash 条件语句
Logstsh 的判断语句,与js的一样
if (val == 0) {
# 一些处理语句
}else if (val == "str") {
# 一些处理语句
}else {
# 其他的处理语句
}
条件语句中的小括号可以加,也可以不加
条件语句可以一起配合使用的运算符如下:
- 相等运算符: ==, !=, <, >, <=, >=
- 正则表达式: =~, !~
- 包含: in, not in
- 逻辑运算符: and(与), or(或), nand(非与), xor(非或)
- 复合表达式: ()
- 对复合表达式的结果取反: !()
- 单目运算符: !
通常来说,你都会在表达式里用到字段引用。为了尽量展示全面各种表达式,下面虚拟一个示例:
if "_grokparsefailure" not in [tags] {
} else if [status] !~ /^2\d\d/ or ( [url] == "/noc.gif" nand [geoip][city] != "beijing" ) {
} else {
}
配置文件
要配置Logstash,您可以创建一个配置文件,指定要使用的插件和每个插件的设置。 您可以在配置中引用事件字段,并使用条件来满足某些条件时处理事件。 当您运行logstash时,使用-f指定您的配置文件。
可以把 Logstash 相关的配置信息放在一个自定义的文件里
# mkdir /app/logstash-1.5.0/conf
# vi /app/logstash-1.5.0/conf/first-pipeline.conf
配置文件的格式是被分为不同的区域块儿的,基本上是根据 输入插件、过滤插件和输出插件来区分的
每一个区域块儿都包含了一个或者多个插件的配置选项。
==如果在同一个区域中使用了多个插件,则配置文件中的顺序就指定了应用到事件处理六的顺序。==
在运行 Logstash 运行时,可以用 -f 参数指定配置文件的完整路径,甚至可以指定一个包含了多个不同类型如输入、过滤和输出插件的配置文件目录
使用 -t (--configtest) 参数可以检查配置文件的语法,而不是真正运行 Logstash
测试配置文件
# ./bin/logstash -f ./config/first-pipeline.conf --config.test_and_exit 或者
# ./bin/logstash -t -f ./conf/first-pipeline.conf
Configuration OK
[2017-10-31T02:00:26,184][INFO ][logstash.runner ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash
如果配置文件通过配置测试,请使用以下命令启动Logstash:
bin/logstash -f ./conf/first-pipeline.conf --config.reload.automatic
该--config.reload.automatic选项启用自动配置重新加载,以便您每次修改配置文件时不必停止并重新启动Logstash。
--config.reload.automatic 可以用 -r 替代
配置文件常见错误:
下面是完整正确的配置文件内容
bin/logstash -t -f conf/first-pipeline.conf
Configuration OK
用指定配置文件的方式启动 Ctrl + d 停止 logstash 在 shell 命令行里执行的启动程序first-pipeline.conf
后台启动
# setsid bin/logstash -r -f conf/first-pipeline.conf
# setsid 是放在后台运行,并且忽略 HUP 信号来使我们的进程避免中途被中断
- 关闭
ps -ef |grep logstash 或者
ps -ef |grep java
kill -9 进程号
运行常用参数
-f 指定配置的文件名或目录
-l 将logstash内部日志写入给定目录。 没有这个标志。
-r, --config.reload.automatic 监视配置的更改,重新加载配置文件
-w,--pipeline.workers COUNT 运行 filter 和 output 的 pipeline 线程数量。默认是 CPU 核数。
其实上面的命令参数在 5.x 之后,都可以在配置文件 logstash.yml 中设置了
使用Grok Filter Plugin 编辑解析Web日志
现在,您有一个工作流程,从Filebeat读取日志行。但是您会注意到日志消息的格式不理想。您要解析日志消息以从日志中创建特定的命名字段。为此,您将使用grok过滤器插件。
因为grok过滤器插件在传入的日志数据中查找模式,所以配置插件需要您决定如何识别用例感兴趣的模式。来自Web服务器日志示例的代表行如下所示:
信息 | 字段名 |
---|---|
IP Address | clientip |
User ID | ident |
User Authentication | auth |
timestamp | timestamp |
HTTP Verb | verb |
Request body | request |
HTTP Version | httpversion |
HTTP Status Code | response |
Bytes served | bytes |
Referrer URL | referrer |
User agent | agent |
使用方法,编辑 first-pipeline.conf
写入下面的内容:
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
}
假如你在这之前已经运行了 logstash 和 filebeat 。要想生效现在的过滤配置,您需要强制Filebeat从头开始读取日志文件。
不必重新启动Logstash来接收更改,但是需要删除 filebeat 下的注册表文件 registry,此文件一般在安装目录下的 data 目录下。
由于Filebeat存储在注册表中收集的每个文件的状态,因此删除注册表文件会强制Filebeat读取从头开始捕获的所有文件。
接下来,使用以下命令重新启动Filebeat即可
使用Geoip Filter插件修改数据
除了解析日志数据以获得更好的搜索之外,过滤插件也可以从现有数据中导出补充信息。例如,geoip插件会查找IP地址,从地址中导出地理位置信息,并将该位置信息添加到日志中。
将Logstash实例配置为使用geoip过滤器插件,将以下行添加到文件的filter部分first-pipeline.conf
geoip {
source => "clientip"
}
完整的示例:
input {
beats {
port => "5043"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}
}
output {
stdout { codec => rubydebug }
}
修改好保存,生效的话,同样先删除 Filebeat 注册文件,之后重启 filebeat
配置输出到 Elasticsearch
input {
beats {
port => "5043"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}
}
output {
elasticsearch {
hosts => [ "localhost:9200" ]
}
}
测试您的管道编辑
现在,Logstash管道配置为将数据索引到Elasticsearch集群中,您可以查询Elasticsearch。
根据grok过滤器插件创建的字段,尝试对Elasticsearch进行测试查询。使用YYYY.MM.DD格式将$ DATE替换为当前日期
curl -XGET '172.16.153.129:9200/logstash-$DATE/_search?pretty&q=response=200'
输出
{
"error" : {
"root_cause" : [
{
"type" : "index_not_found_exception",
"reason" : "no such index",
"resource.type" : "index_or_alias",
"resource.id" : "logstash-$DATE",
"index_uuid" : "_na_",
"index" : "logstash-$DATE"
}
],
"type" : "index_not_found_exception",
"reason" : "no such index",
"resource.type" : "index_or_alias",
"resource.id" : "logstash-$DATE",
"index_uuid" : "_na_",
"index" : "logstash-$DATE"
},
"status" : 404
}
注意
索引名称中使用的日期是基于UTC,而不是Logstash运行的时区。如果查询返回index_not_found_exception,请确保logstash-$DATE反映了索引的实际名称。要查看可用索引的列表,请使用此查询:curl 'localhost:9200/_cat/indices?v'
# curl '172.16.153.129:9200/_cat/indices?v'
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
yellow open logstash-2017.10.23 E5i7TpirSTO_RmOwZarBLg 5 1 15 0 40kb 40kb
- 列出集群中的所有节点
➜ ~ curl '172.16.153.129:9200/_cat/nodes?v'
ip heap.percent ram.percent cpu load_1m load_5m load_15m node.role master name
172.16.153.129 38 96 11 0.46 2.23 2.12 mdi * VXgFbKA
- 列出集群健康状态
➜ ~ curl '172.16.153.129:9200/_cluster/health?pretty=true'
{
"cluster_name" : "elasticsearch",
"status" : "yellow",
"timed_out" : false,
"number_of_nodes" : 1,
"number_of_data_nodes" : 1,
"active_primary_shards" : 131,
"active_shards" : 131,
"relocating_shards" : 0,
"initializing_shards" : 0,
"unassigned_shards" : 131,
"delayed_unassigned_shards" : 0,
"number_of_pending_tasks" : 0,
"number_of_in_flight_fetch" : 0,
"task_max_waiting_in_queue_millis" : 0,
"active_shards_percent_as_number" : 50.0
}
Kibana
下载
curl -O https://artifacts.elastic.co/downloads/kibana/kibana-5.6.3-linux-x86_64.tar.gz
安装
# tar -xf kibana-5.6.3-linux-x86_64.tar.gz
# ln -s kibana-5.6.3-linux-x86_64 kibana
# cd kibana
配置
# cd /app/kibana/config
# vi kibana.yml
server.port: 5601
server.host: "localhost"
elasticsearch.url: "http://localhost:9200"
运行 Kibana
# cd /app/kibana
# bin/kibana
- 关闭
ps -ef |grep node
kill -9 进程号
验证安装
在浏览其中输入以下地址
http://localhost:5601