之前我们已经可以把kibana把日志展示出来了 现在我们可以增加一些新的功能 比如增加过滤条件,在上面这个文章所构建的环境的基础上。
基本做法就是
在日志挖掘的logstash的配置文件中添加filter条件
这里是在 `logstash/logstash_stdout.conf`中添加
使用Grok Filter Plugin
编辑解析Web日志
现在,您有一个工作流程,从Filebeat读取日志行。但是您会注意到日志消息的格式不理想。您要解析日志消息以从日志中创建特定的命名字段。为此,您将使用grok过滤器插件。
因为grok过滤器插件在传入的日志数据中查找模式,所以配置插件需要您决定如何识别用例感兴趣的模式。来自Web服务器日志示例的代表行如下所示:
信息 | 字段名 |
---|---|
IP Address | clientip |
User ID | ident |
User Authentication | auth |
timestamp | timestamp |
HTTP Verb | verb |
Request body | request |
HTTP Version | httpversion |
HTTP Status Code | response |
Bytes served | bytes |
Referrer URL | referrer |
User agent | agent |
使用方法,编辑 logstash/logstash_stdout.conf
写入下面的内容:
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
}
假如你在这之前已经运行了logstash
和 filebeat
。要想生效现在的过滤配置,您需要强制Filebeat
从头开始读取日志文件。
不必重新启动Logstash
来接收更改,但是需要删除 filebeat
下的注册表文件 registry
,此文件一般在安装目录下的 data
目录下。
由于Filebeat
存储在注册表中收集的每个文件的状态,因此删除注册表文件会强制Filebeat
读取从头开始捕获的所有文件。
接下来,使用以下命令重新启动Filebeat
即可
使用Geoip Filter
插件修改数据
除了解析日志数据以获得更好的搜索之外,过滤插件也可以从现有数据中导出补充信息。例如,geoip插件会查找IP地址,从地址中导出地理位置信息,并将该位置信息添加到日志中。
将Logstash
实例配置为使用geoip
过滤器插件,将以下行添加到文件的filter
部分
geoip {
source => "clientip"
}
完整的示例:
input {
beats {
port => 5044
host => "0.0.0.0"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}
}
output {
elasticsearch { -----输出到ela中
hosts => ["elasticsearch:9200"]
manage_template => false
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
# stdout { codec => rubydebug } # 假如有问题,可以打开此行进行调试
}
修改好保存,生效的话,同样先删除Filebeat
注册文件,之后重启 filebeat
kibana截图展示
测试您的管道编辑
现在,Logstash
管道配置为将数据索引到Elasticsearch
集群中,您可以查询Elasticsearch
。
过滤nginx
的logstash/logstash_stdout.conf
input {
beats {
port => 5044
host => "0.0.0.0"
}
}
filter {
if ([fileset][module] == "nginx") {
if ([fileset][name] == "access") {
grok {
match => { "message" => ["%{IPORHOST:[nginx][access][remote_ip]} - %{DATA:[nginx][access][user_name]} \[%{HTTPDATE:[nginx][access][time]}\] \"%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}\" %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} \"%{DATA:[nginx][access][referrer]}\" \"%{DATA:[nginx][access][agent]}\""] }
remove_field => "message"
}
mutate {
add_field => { "read_timestamp" => "%{@timestamp}" }
}
date {
match => [ "[nginx][access][time]", "dd/MMM/YYYY:H:m:s Z" ]
remove_field => "[nginx][access][time]"
}
useragent {
source => "[nginx][access][agent]"
target => "[nginx][access][user_agent]"
remove_field => "[nginx][access][agent]"
}
geoip {
source => "[nginx][access][remote_ip]"
target => "[nginx][access][geoip]"
}
}
else if [fileset][name] == "error" {
grok {
match => { "message" => ["%{DATA:[nginx][error][time]} \[%{DATA:[nginx][error][level]}\] %{NUMBER:[nginx][error][pid]}#%{NUMBER:[nginx][error][tid]}: (\*%{NUMBER:[nginx][error][connection_id]} )?%{GREEDYDATA:[nginx][error][message]}"] }
remove_field => "message"
}
mutate {
rename => { "@timestamp" => "read_timestamp" }
}
date {
match => [ "[nginx][error][time]", "YYYY/MM/dd H:m:s" ]
remove_field => "[nginx][error][time]"
}
}
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
manage_template => false
index => "ddd---%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
# stdout { codec => rubydebug } # 假如有问题,可以打开此行进行调试
}
ELA集群
文件 docker-compose.yml 内容
version: "3.2"
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.4.2
container_name: elasticsearch
networks:
- elk-net
ports:
- "9200:9200"
environment:
- discovery.zen.minimum_master_nodes=2
- node.name=elasticsearch
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es02,es03
- cluster.initial_master_nodes=elasticsearch,es02,es03
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
es02:
image: docker.elastic.co/elasticsearch/elasticsearch:7.4.2
container_name: es02
networks:
- elk-net
environment:
- discovery.zen.minimum_master_nodes=2
- node.name=es02
- cluster.name=es-docker-cluster
- discovery.seed_hosts=elasticsearch,es03
- cluster.initial_master_nodes=elasticsearch,es02,es03
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
es03:
image: docker.elastic.co/elasticsearch/elasticsearch:7.4.2
container_name: es03
networks:
- elk-net
environment:
- discovery.zen.minimum_master_nodes=2
- node.name=es03
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es02,elasticsearch
- cluster.initial_master_nodes=elasticsearch,es02,es03
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
filebeat:
image: docker.elastic.co/beats/filebeat:7.4.2
container_name: filebeat
volumes:
- type: bind
source: "./filebeat/2018.log"
target: "/2018.log"
- type: bind
source: "./filebeat/filebeat.yml"
target: "/usr/share/filebeat/filebeat.yml"
networks:
- elk-net
depends_on:
- logstash
logstash:
image: docker.elastic.co/logstash/logstash:7.4.2
container_name: logstash
volumes:
- type: bind
source: "./logstash/logstash_stdout.conf"
target: "/usr/share/logstash/pipeline/logstash.conf"
networks:
- elk-net
depends_on:
- elasticsearch
kibana:
image: docker.elastic.co/kibana/kibana:7.4.2
container_name: kibana
networks:
- elk-net
ports:
- "5601:5601"
depends_on:
- elasticsearch
networks:
elk-net:
driver: bridge
ELA服务里的环境变量说明
discovery.zen.minimum_master_nodes=2 #设置最新主节点数,有助于防止脑裂
node.name=elasticsearch #指定节点名字
cluster.name=es-docker-cluster #指定集群名字
discovery.seed_hosts=es02,es03 #写入除自己以外的候选主节点的设备地址,来开启服务时就可以被选为主节点
cluster.initial_master_nodes=elasticsearch,es02,es03 #写入全部的候选主节点的设备地址,来开启服务时就可以被选为主节点
bootstrap.memory_lock=true #在ES运行起来后锁定ES所能使用的堆内存大小,锁定内存大小一般为可用内存的一半左右;锁定内存后就不会使用交换分区。如果不打开此项,当系统物理内存空间不足,ES将使用交换分区,ES如果使用交换分区,那么ES的性能将会变得很差
docker-compose up
后访问http://localhost:9200/
使用谷歌的Elasticsearch Head
插件,可观察到主节点和从节点信息
-
粗框是
主节点
,细框是从节点
对于节点说明
设置最新主节点数
minimum_master_nodes 设定对你的集群的稳定 极其 重要
当你的集群中有两个 masters(注:主节点)的时候,这个配置有助于防止 脑裂 。
如果你的集群发生了脑裂,那么你的集群就会处在丢失数据的危险中,因为主节点被认为是这个集群的最高统治者,它决定了什么时候新的索引可以创建,分片是如何移动的等等。如果你有 两个 masters 节点, 你的数据的完整性将得不到保证,因为你有两个节点认为他们有集群的控制权,就会导致冲突。
此设置应该始终被配置为 master 候选节点的法定个数(大多数个)。法定个数就是 ( master 候选节点个数 / 2) + 1 。 这里有几个例子:
如果你有 10 个节点(能保存数据,同时能成为 master),法定数就是 6 。
如果你有 3 个候选 master 节点,和 100 个 data 节点,法定数就是 2 ,你只要数数那些可以做 master 的节点数就可以了。
如果你有两个节点,你遇到难题了。法定数当然是 2 ,但是这意味着如果有一个节点挂掉,你整个集群就不可用了。 设置成 1 可以保证集群的功能,但是就无法保证集群脑裂了,像这样的情况,你最好至少保证有 3 个节点。
建议这样配置:
discovery.zen.minimum_master_nodes: 2
但是由于 ELasticsearch 是动态的,你可以很容易的添加和删除节点, 但是这会改变这个法定个数。 你不得不修改每一个索引节点的配置并且重启你的整个集群只是为了让配置生效。
解决办法是同时添加如下的配置项:
PUT /_cluster/settings
{
"persistent" : {
"discovery.zen.minimum_master_nodes" : 2
}
}
这个配置允许通过 API 调用的方式动态进行配置。
这将成为一个永久的配置,并且无论你配置项里配置的如何,这个将优先生效。
当你添加和删除 master 节点的时候,你需要通过 API 的方式更改这个配置。