ELK

现状:排查现场异常问题时,通常需要查询日志,对于微服务架构或分布式部署,日志比较分散,需要查找多处的日志进行分析,导致问题排除效率低。


ELK是一个集中式日志系统的技术栈,由下列部分组成

  1. Elasticsearch:实时的分布式的Restful 风格的搜索和分析引擎,能对大容量的数据进行接近实时的存储、搜索和分析操作。
  2. Logstash:具有实时渠道能力的数据收集引擎,用于收集不同数据源的日志,并将数据标准化处理后传输到用户指定的位置(本集群中为ES)。
  3. Kibana:通常与 Elasticsearch 配合使用,对其中数据进行搜索、分析并生成各种维度的图表,为 Elasticsearch提供分析和可视化的 Web 平台。
  4. Beats,可作为logstash的input输入或直接存入ES
    4.1 Filebeat搜集日志:解决Logstash占用服务器资源过多的问题。在需要采集日志数据的服务器上安装 Filebeat,指定日志目录或日志文件后, Filebeat就能读取数据,迅速发送到Logstash进行解析,或直接发送到Elasticsearch进行集中式存储和分析。
    4.2 Metricbeat:系统级性能指标监控工具,收集CPU、内存、磁盘等系统指标和Apache/MySQL/Redis等服务指标
    4.3 Heartbeat:心跳检测工具,监控服务可用性
    4.4 Packetbeat:网络数据包分析工具

官网手册
ELK版本:最新版本8.2.3
搭建环境:Linux x86_64 CentOS7.6
默认不支持以root启动,切换elk用户

服务启动

Elasticsearch

服务启动成功后,访问https://{服务器IP}:9200/
默认开启ssl,需要输入初次启动创建的用户名和密码

Kibana

服务启动成功后,访问http://{服务器IP}:5601/
如果从非部署本机的服务器访问,需要修改配置后重启,否则无法访问
vim config/kibana.yml
server.host : "部署服务器IP"
初次访问需要配置Elastic,选择Configure manually,用户名使用默认的kibana_system,密码通过忘记密码到elasticsearch下重置

Logstash

配置管道,输入为控制台输入,输出到Elasticsearch
vim config/pipelines.yml

ELK基础配置说明

Elasticsearch

配置项 说明
path.data 存放数据的目录,建议放到安装路径外
path.logs 存放日志的目录,建议放到安装路径外
cluster.name 集群名称,自定义,默认为elasticsearch
node.name 节点名称,自定义,默认为主机名
network.host 服务IP,默认为127.0.0.1,修改为主机IP,否则无法在非本机访问
http.port 服务启动端口,默认为9200
xpack.security.enabled 是否开启安全功能,默认为true,建议开启,为false时直接通过http访问,无身份认证等安全校验
xpack.security.http.ssl:enabled 是否开启TLS/SSL,开启安全功能时生效,true/false,https还是http访问,需要身份验证

Kibana配置kibana.yml

配置项 说明
server.host 默认为localhost,无法被其他机器访问 服务器IP
server.port 服务启动端口 默认为5601
elasticsearch.hosts Elasticsearch的访问url,开启SSL/TLS时为https 支持同一集群的多个节点url
elasticsearch.username Elasticsearch的用户名,开启安全功能时配置 限制超级账户elastic,推荐kibana_system
elasticsearch.password Elasticsearch的密码,开启安全功能时配置 elasticsearch-reset-password获取密码
elasticsearch.ssl.certificateAuthorities CA证书路径,开启TLS/SSL时配置 elasticsearch/config/certs获取crt证书

Logstash配置

logstash.yml

配置项 说明
node.name 默认为本机名 自定义
pipeline.* 管道配置 在pipelines.yml中配置
config.test_and_exit 检查配置是否有效然后退出 默认为false
config.reload.automatic 自动重载配置 默认为false
log.level 日志级别 默认为info
path.logs 日志路径 默认为logs

Logstash多管道配置pipelines.yml

配置项 说明
pipeline.id 管道ID,必填,以-开头标识一组管道配置 自定义
pipeline.workers 并行数 默认为服务器CPU内核数
pipeline.batch.size 传送给filter和output之前,从input收集的最大事件数 默认为125
config.string 管道配置字符串 自定义,参照下方配置组成说明
path.config 管道配置文件,可以是目录,模板为logstash-sample.conf 自定义,Logstash安装目录的相对路径,文件内容参照下方配置组成说明
pipeline.ordered 是否设置管道事件顺序 默认为auto
queue.type 事件缓存方式,memory基于内存,persisted基于磁盘 默认为memory

管道配置config.string/path.config

一个Logstash管道由三个组件组成:input日志输入端、filter日志解析端、output日志输出端。
管道配置中需指明启动的组件,其中filter 是可选的。

input

公共配置项
  • type,input组件添加type标签
  • tags,input组件添加tags标签,数组
  • codec,输入数据的解码器,默认为plain,常用为json
  • add_field,自定义属性,哈希键值对集合,格式为"field1" => "value1"
http
http {
        port => "9090"
    } 

有阻塞的风险,客户端会超时,但会处理事件发送

  • host,ip地址,默认为0.0.0.0
  • port,监听端口,默认为8080
  • user,基本认证的用户名,默认为空,设置时开启身份认证
  • passowrd,基本认证的密码,默认为空,设置时开启身份认证
  • additional_codecs,默认为"application/json" => "json",优先级高于公共配置项codec;建议请求使用json,会自动解析字段存入ES
  • max_content_length,http请求内容的最大字节数,默认为100M
  • max_pending_requests,队列的最大请求数,默认为200
  • threads,线程数,默认为处理器数量
tcp***
    tcp {
        port => "9191"
        codec => json
    } 

每个事件为一行文本

  • host,ip地址,默认为0.0.0.0
  • port,端口,无默认值,必填项
  • mode,操作模式,默认为server
  • tcp_keep_alive,boolean,是否保持连接,默认为false
    结合SocketAppender支持接收Log4j2日志,以json格式
  • log4j2.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <appenders>
        <console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="[%d{HH:mm:ss:SSS}] [%p] - %l - %m%n" />
        </console>
        <Socket name="Socket" host="logstash服务ip" port="logstash-input-tcp监听端口">
            <JsonLayout compact="true" eventEol="true" />
        </Socket>
    </appenders>
    <loggers>
        <root level="INFO">
            <appender-ref ref="Console" />
            <AppenderRef ref="Socket" />
        </root>
    </loggers>
</configuration>
kafka***
   kfaka {
        bootstrap_servers => "172.30.12.192:9092"
        topics => ["logstash-test"]
        group_id => "logstash_kfaka "
    } 
  • bootstrap_servers,kfaka实例的url,多个以,分隔
  • consumer_threads,消费者线程数,与kfaka分区一样多;运行多个线程可以增加吞吐量
  • client_id,请求客户端标识,传递给服务端用于跟踪请求来源
  • group_id,消费者所属组标识,默认为logstash;单个管道中如果配置多个kfaka插件从不同的topic读取时,需使用不同的group_id且建议设置唯一的client_id,运行多个具有相同group_id的logstash实例进行负载均衡
  • topics,订阅的主题列表,默认为["logstash"]
  • topics_pattern,订阅的主题正则表达式,使用该配置时忽略topics
  • decorate_events,装饰事件,none/basic/extended,默认为none,非none时添加kfaka信息字段到@metadata下,比如topic /offset/key/consumer_group/partition
  • poll_timeout_ms,等待读取新消息的时间(毫秒),默认为100毫秒
  • connections_max_idle_ms,空闲连接保持时间(毫秒),默认为540000(9分钟)
  • request_timeout_ms,等待请求响应超时时间,默认为40秒
  • auto_commit_interval_ms,提交消费者偏移量给kfaka的频率(毫秒),默认为5000
  • enable_auto_commit,是否自动提交消费者偏移量,默认为true,需修改为false,否则可能会造成数据丢失
  • auto_offset_reset,在kfaka中无初始偏移量或超出范围时的处理方式,earliest/latest/none/anything else
  • check_crcs,检查数据传输是否完成,默认为true,会额外增加开销,追求性能的情况下可禁用
beats*
    beats {
        port => "5044"
    }

建议输出的ES索引与beats版本关联,index =>"%{[@metadata][beat]}-%{[@metadata][version]}-%{+yyyyMMdd}"

  • port,必填,监听端口
  • client_inactivity_timeout,X秒无活动后关闭空闲客户端
  • executor_threads,处理beats请求的线程数,默认为一个CPU核数一个线程

filter

获取输入中的属性字段或新增字段,进行格式化或者业务逻辑处理,可传递到输出组件使用,使用[字段名]引用字段

公共配置项
  • add_field,自定义字段,哈希键值对集合,格式为"field1" => "value1",%{field}引用input事件中的字段,对于不需要输出到日志事件里的字段定义,格式为" [@metadata][字段名] " => "value",通过%{[@metadata][target_index]}"引用
  • add_tag,自定义标签,数组,%{field}引用input事件中的字段
  • remove_field,删除字段,%{field}引用input事件中的字段
  • remove_tag,删除标签,%{field}引用input事件中的字段
date日期解析插件
date {
  timezone=> "Asia/Shanghai"
  match => ["logdate","yyyyMMdd"]
  target => "@timestamp" 
}
  • match,数组,第一个为字段名,后面为格式(可有多个),格式需要与字段值匹配,解析字段中的日期作为事件的时间戳
  • timezone,时区,@timestamp默认为UTC时间
  • target,目标字段,默认为"@timestamp"
gork正则捕获
grok {
  match => {  "message" => "Duration: %{NUMBER:duration}" }
 }

kibana附带grok规则在线测试

  • match,通过正则表达式解析非结构化文本,定义查找位置(message字段)和模式,语法是%{正则表达式或预定义的匹配模式:赋值字段名称},gork预定义的匹配模式有
    IP,IP地址
    URIPATH,URI路径
    WORD,匹配字符串,包括数字和大小写字母
    NOTSPACE,不带任何空格的字符串
    NUMBER,数字
    TIMESTAMP_ISO8601,2016-07-03T00:34:06+08:00样式时间戳
    自定义正则表达式为(?<field_name>the pattern here)
  • "message" => [ ],从一个字段匹配多个模式时,可以赋值为数组
  • 模式依赖前一个模式创建的字段时,需要单独的gork插件
  • overwrite,覆盖已有的字段值,数组,值为"字段名"
  • patterns_dir,匹配模式文件路径,文件内容为模式名称 表达式
  • patterns_files_glob,匹配patterns_dir下的文件,默认为*
  • tag_on_failure,匹配失败添加标签,数组,默认为["_grokparsefailure"]
  • tag_on_timeout ,匹配超时添加标签,默认为"_groktimeout"
  • target,匹配结果保存的目标命名空间
  • timeout_millis,每个匹配模式的超时时间,毫秒,默认为30000
json解析
    json {
        source => "source_field"
    }
  • source,必填,要从json格式的输入事件中解析的字段名称
  • tag_on_failure
  • target,配置该项放入事件的目标字段中,为空默认放在顶层
  • json多层的情况下,解析出一层add_field放到字段里,再单独json控件进行解析
mutate对字段进行基础处理
    mutate {
        split => { "hostname" => "." }
        convert => {
          "fieldname" => "integer"
          "booleanfield" => "boolean"
        }
        copy => { "source_field" => "dest_field" }
        add_field => { "shortHostname" => "%{[hostname][0]}" }
    }

按下列配置的顺序进行处理,如自定义顺序需要分为不同的mutate控件

  • coerce,设置字段默认值,哈希
  • rename,字段重命名,哈希
  • update,更新字段值,字段不存在不会创建
  • replace,替换字段值,可以%{field}引用别的字段,字段不存在会创建
  • convert,字段值类型转换,格式为"字段名称" => "类型名称",支持的类型有:integer/float/string/boolean
  • gsub,正则表达式进行匹配替换,数组,每3个元素为一组
  • uppercase,字符串转换为大写字母,数组,值为"字段名称"
  • capitalize,字符串首字母大写,数组,值为"字段名称"
  • lowercase,字符串转换为小写字母,数组,值为"字段名称"
  • strip,去除首尾两端的空格,数组
  • split,使用分隔符将字段值拆分为数组,仅对字符串有效,"字段名称" => "分隔符"
  • join,使用分隔符连接数组,"数组字段名称" => "分隔符"
  • merge,合并数组或哈希字段,"目标字段" => "附加字段"
  • copy,复制字段,已有字段会被覆盖
  • tag_on_failure,匹配失败添加标签,数组
elasticsearch
    elasticsearch {
        hosts => [ "https://localhost:9200" ]
        user => "elastic"
        password => "-rFGdWcXsOhAWcQ0Z8Sp"
        query => "msgID:%{[traceId]} AND data_stream.namespace:single"
        sort => "@timestamp:asc"
        fields => { 
                "nodeTime" => "nodeStartTime"
                "@timestamp" => "start"
        }
}
  • hosts,elasticsearch访问url,开启SSL/TLS时为https, 支持同一集群的多个节点url
  • user,elasticsearch的用户名
  • password,elasticsearch的用户名密码
  • query,ES查询语句,为kibana界面上的KQL
  • query_template,ES查询模板文件
  • result_size,查询结果数量,默认为1
  • sort,排序规则,默认为@timestamp:desc,格式为字段名:排序方向
  • index,查找的索引名称,默认为空查询所有
  • fields,从查找结果的事件中复制字段到新事件中,格式为{ "fromField" => "toField"}
  • docinfo_fields,从查询结果的文档信息中复制字段到新事件中

output

控制台
    stdout { }
Elasticsearch
    elasticsearch {
        hosts => [ "https://localhost:9200" ]
        user => "elastic"
        password => "-rFGdWcXsOhAWcQ0Z8Sp"
        cacert => "/home/elk/logstash-8.2.3/config/hhtp_ca.crt"
        index => "logstash-hexing01-%{+yyyyMMdd}"
    }

存储到ES为UTC标准时间,但是通过kibana展示会转换为浏览器时区
支持if else语句,对不同的输入进行不同的输出处理

  • hosts,elasticsearch访问url,开启SSL/TLS时为https, 支持同一集群的多个节点url
  • user,elasticsearch的用户名
  • password,elasticsearch的用户名密码
  • cacert,elasticsearch的CA证书路径,开启TLS/SSL时配置,从elasticsearch/config/certs获取crt证书
  • document_id,文档标识,用于覆盖更新ES中有相同ID的现有条目
  • index,事件写入的索引名称,自定义,支持动态参数,%{+yyyyMMdd}为日期,数据量大时建议按HH小时,data_stream为true时不能设置;支持自定义索引模板,本期预研暂不考虑
  • data_stream,true/false/auto,使用该选项存储时间序列数据流,自动创建数据流和其后备索引
  • data_stream_type,数据流类型,logs/metrics/synthetics/traces
  • data_stream_dataset,数据集名称,字母必须小写
  • data_stream_namespace,数据集命名空间,字母必须小写
    • 数据流名称=数据流类型-数据集名称-数据集命名空间
    • 后备索引名称=.ds-数据集名称-%{+yyyy.MM.dd}后备索引创建时间-自增序号
    • Rollover滚动操作才会创建新的后备索引,通过index生命周期策略自动触发,索引模板中设置使用的生命周期策略

http://xx:9200/_ilm/policy/2-days-index-2 查看所有生命周期策略
http://xx:9200/_ilm/policy/2-days-index-2 创建生命周期策略
put http://${es_url}/_index_template/索引模板名称 创建索引模板

{
 "index_patterns": [
 "索引名称匹配模式*"
 ],
 "template": {
 "settings": {
 "index": {
 "lifecycle": {
 "name": "生命周期策略名称",
 "indexing_complete": "true"
 },
 "sort": { 
 "field": "排序字段名称,提高查询排序的性能",
 "order": "排序方向asc/desc"
 },
 "analysis": {
 "analyzer": { # 自定义分词器为每2位一个词,提高报文模糊匹配查询的性能
 "custom_two": {
 "type": "custom",
 "tokenizer": "standard_two"
 }
 },
 "tokenizer": {
 "standard_two": {
 "type": "standard",
 "max_token_length": "2"
 }
 }
 },
 "number_of_shards": "1",
 "number_of_replicas": "0"
 }
 },
 "mappings": { # 报文解析字段的映射
 "properties": {
 "字段名称": {
 "type": "字段类型"
 },
 "content": { # 设置报文使用自定义分词器,且keyword最大长度为4096
 "analyzer": "custom_two",
 "type": "text",
 "fields": {
 "keyword": {
 "type": "keyword",
 "ignore_above": 4097
 }
 }
 }
 }
 }
  },
 "priority": 210, # 索引模板优先级,索引模式匹配的模板,使用优先级最高的模板创建索引
 "version": 1
}

调用ES的API获取数据

  • 统计总条数:post http://XX/${索引名称,支持*匹配}/_count
  • 查询明细:post http://XX/${索引名称,支持*匹配}/_search?filter_path=took,hits.total.value,hits.hits._id,hits.hits._source,hits.hits._explanation,profile
  • 请求体,json格式

查询条件

"query": {
"bool": {
"filter/should": [// filter为and逻辑,should为or逻辑
// 单个或多个查询条件组合, json格式
]
}
}

范围查询,比如日期、数字

"range": {
"字段名称,比如日期、数字格式字段": {
"gte": "最小值", // >=
"lt": "最大值" // <
}
}

text.keyword term精确查询

"term": {
"字段名称.keyword": "查询值"
}

integer match精确查询

"match": {
"字段名称": 查询值
}

text match_phrase模糊匹配

"match_phrase":{
"字段名称": "查询值"
}

text.keyword regexp模糊匹配

"regexp": {
"字段名称.keyword": "正则表达式格式的查询值:.*匹配任意个字符"
}

分页查询,返回[from]-[from+size],避免深度翻页

"from": 0,
"size": 10,

  1. 排序,使用索引中设置的排序规则
    "sort": {
    "字段名称": "排序方向desc/asc"
    },

查询结果列

"_source": [
// 字段名称列表
]

查询解释,调试使用,生产环境不建议配置

"profile": true,
"explain": true

ES查询分析

ES耗性能的查询

  1. fuzzy queries (except on wildcard fields)
  2. regexp queries (except on wildcard fields)
  3. prefix queries (except on wildcard fields or those without index_prefixes)
  4. wildcard queries (except on wildcard fields)
  5. range queries on text and keyword fields
  6. wildcard搜索的时候最好避免在检索词的开头使用*
  7. avoid using the term query for text fields.
  8. 按日期降序排列,索引模板设置"sort": { "field": "tvDate", "order": "desc" }
  9. 支持分页查询,分页过多建议使用[search_after] parameter with a point in time
  10. 默认最多只支持查询10000个明细记录,需修改索引配置支持更多

查询场景

  • 短文本,精确查询 text+keyword,使用text.keyword term精确匹配
  • 长文本,模糊查询 text(自定义每2位分词)+keyword(最大长度4096),使用text match_phrase模糊匹配
  • 短文本,无查询 text+keyword
  • 短文本,模糊/精确查询 text+keyword,使用text.keyword term精确匹配,text.keyword regexp模糊匹配
  • 整数,精确查询 integer,使用match精确查询
  • 整数,范围查询 integer,使用range范围查询
  • 短文本,无查询 text+keyword
  • 日期格式文本 text+keyword
    对应tv的日期,范围查询 date,desc sort排序设置,range日期范围查询

ES存储方式

ES将各个文档中的内容,进行分词,形成词条。然后记录词条和数据的唯一标识(id)的对应关系,形成的产物。
ES中文档是以json格式的数据进行存储到索引库中的,es将document里面的所有内容进行分词,没有的词条再次增加,最后创建倒排索引。

  • 索引过程:
  1. 有一系列被索引文件
  2. 被索引文件经过语法分析和语言处理形成一系列词(Term) 。
  3. 经过索引创建形成词典和反向索引表。
  4. 通过索引存储将索引写入硬盘。
  • 搜索过程:
    a) 用户输入查询语句。
    b) 对查询语句经过语法分析和语言分析得到一系列词(Term) 。
    c) 通过语法分析得到一个查询树。
    d) 通过索引存储将索引读入到内存。
    e) 利用查询树搜索索引,从而得到每个词(Term) 的文档链表,对文档链表进行交,差,并得到结果文档。
    f) 将搜索到的结果文档对查询的相关性进行排序。
    g) 返回查询结果给用户。

服务器配置参考

  • 最小磁盘总大小 = 源数据 * (1 + 副本数量) * (1 + 索引开销) / (1 - Linux预留空间) / (1 - Elasticsearch开销) / (1 - 安全阈值)
    = 源数据 * (1 + 副本数量) * 1.7
    = 源数据 * 3.4
  • 集群最大节点数 = 单节点CPU * 5
  • 单节点最大数据量
    • 数据加速、查询聚合等场景
      单节点最大数据量 = 单节点Mem(G) * 10
    • 日志写入、离线分析等场景
      单节点最大数据量 = 单节点Mem(G) * 50
    • 通常情况
      单节点最大数据量 = 单节点Mem(G) * 30
  • 建议在小规格节点下单shard大小不要超过30GB。更高规格的节点单shard大小不要超过50GB。对于日志分析场景或者超大索引,建议单shard大小不要超过100GB。
  • shard的个数(包括副本)要尽可能匹配节点数,等于节点数,或者是节点数的整数倍。
  • 通常我们建议单节点上同一索引的shard个数不要超5个。

集群部署

Cluster

Cluster代表一个集群,集群中有多个节点,其中有一个为主节点,这个主节点可以通过选举产生。主从节点是对于集群内部来说的。Elasticsearch的一个概念就是去中心化,字面上理解就是无中心节点,这是对于集群外部来说的。因为从外部来看Elasticsearch集群,在逻辑上是个整体,与任何一个节点的通信和与整个Elasticsearch集群通信是等价的。一个集群至少3个节点。

Shards

Shards代表索引分片,Elasticsearch可以把一个完整的索引分成多个分片,这样的好处是可以把一个大的索引拆分成多个,分布到不同的节点上,构成分布式搜索。分片的数量只能在索引创建前指定,并且索引创建后不能更改。

replicas

replicas代表索引副本,Elasticsearch可以设置多个索引的副本,副本的作用一是提高系统的容错性,当某个节点某个分片损坏或丢失时可以从副本中恢复。二是提高Elasticsearch的查询效率,Elasticsearch会自动对搜索请求进行负载均衡。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容