ELK Stack

我们使用elasticsearch作为搜索引擎,基本上会把相关的相关的软件和功能都用上了。官网提供了和它相辅相成的一套软件,起名:Elastic Stack。这里我们就看看Elastic Stack的功能和简单使用。

1. Elastic Stack功能

  • 先看看官方是怎么定义的:
    Elastic Stack:了解可帮助您构建搜索体验、解决问题并取得成功的搜索平台
    核心产品包括 ElasticsearchKibanaBeatsLogstash(也称为 ELK Stack)等等。能够安全可靠地从任何来源获取任何格式的数据,然后对数据进行搜索、分析和可视化。
    它集成的功能插件可以从这里有一个整体的认识:
    image.png
  • 再看看官方介绍的功能,那叫一个强啊:
    Elastic Stack 功能:Elastic Stack 包含各种功能(之前统一称为 X-Pack),从企业级安全性和开发人员友好型 API,到 Machine Learning 和图表分析,非常全面;借助这些功能,您能够对所有类型的数据进行大规模采集、分析、搜索和可视化。
    自身

    功能

功能有很多,这里截取了一部分,大家可以自己去看看

2 Logstash

官方介绍
Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。
Logstash 能够动态地采集、转换和传输数据,不受格式或复杂度的影响。利用 Grok 从非结构化数据中派生出结构,从 IP 地址解码出地理坐标,匿名化或排除敏感字段,并简化整体处理过程。
它的整个工作流程是三段式的:输入 -> 筛选 -> 输出

工作流程

2.1 输入

采集各种样式、大小和来源的数据。常见的输入方式有:文件、beats、http、jdbc、kafka等。
数据往往以各种各样的形式,或分散或集中地存在于很多系统中。Logstash 支持各种输入选择,可以同时从众多常用来源捕捉事件。能够以连续的流式传输方式,轻松地从您的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。

输入源

2.2 筛选

实时解析和转换数据
数据从源传输到存储库的过程中,Logstash 筛选器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便进行更强大的分析和实现商业价值。
Logstash 能够动态地转换和解析数据,不受格式或复杂度的影响:

  • 利用 Grok 从非结构化数据中派生出结构
  • 从 IP 地址破译出地理坐标
  • 将 PII 数据匿名化,完全排除敏感字段
  • 简化整体处理,不受数据源、格式或架构的影响。

使用我们丰富的筛选器库和功能多样的 Elastic Common Schema,您可以实现无限丰富的可能。

帅选

2.3 输出

选择您的存储库,传输您的数据
尽管 Elasticsearch 是我们的首选输出方向,能够为我们的搜索和分析带来无限可能,但它并非唯一选择。
Logstash 提供了众多输出选择,您可以将数据发送到所需的位置,并且能够灵活地解锁众多下游用例。

输出

3. elk搭建

前面我们已经搭建了elasticsearchkibana,并做了监控安装了分词器
这里我们只用docker compose安装Logstash就可以了。官网有教程,比较粗糙,可以参考。

3.1 docker compose文件

docker-compose-logstash.yml

version: '3.9'
services:
    beats:
        image: 'docker.elastic.co/logstash/logstash:8.14.3'
        ports: 
            - 5044:5044
        container_name: logstash
        hostname: logstash
        restart: always
        environment: 
            - TZ=Asia/Shanghai
            - ES_JAVA_OPTS=-Xms512m -Xmx512m
        privileged: true
        volumes:
            - /etc/timezone:/etc/timezone
            - /etc/localtime:/etc/localtime:ro
            # 映射日志文件,要读取的
            - '/logs/test:/logs/test'
            # 映射pipeline配置文件目录
            - '/opt/soft/logstash/pipeline:/usr/share/logstash/pipeline'
            # 映射logstash配置文件
            - '/opt/soft/logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml'
            # 映射elasticsearch证书目录,连接时候用
            - '/opt/soft/elasticsearch/config/certs:/usr/share/logstash/config/certs'
        networks:
            elastic: 
                # {}
                ipv4_address: 172.18.0.16
networks:
    elastic: 
      external: true
      driver: bridge
3.2 配置文件

Logstash 的配置主要分为两部分:Pipeline 配置文件Settings 配置文件

  • Pipeline 配置文件(logstash.conf):这是 Logstash 的核心配置,用于定义数据处理的流程,包括输入(input)、过滤(filter)和输出(output)三个部分。每个部分都可以使用多种插件来完成特定的任务。例如,输入部分可以使用 file 插件从文件中读取数据,过滤部分可以使用 grok 插件解析日志,输出部分可以使用 elasticsearch 插件将数据发送到 Elasticsearch。位置在./pipeline/logstash.conf
  • Settings 配置文件(logstash.yml):这是 Logstash 的全局配置,包括 Logstash 实例的名称、数据存储路径、配置文件路径、自动重载配置、工作线程数量等。位置在:./config/logstash.yml
    在 Logstash 启动时,它会首先读取 Settings 配置文件,然后加载并执行 Pipeline 配置文件。
3.2.1 logstash.conf

注意:证书文件需要处理一下,不支持.crt格式的,复制一份,后缀改成.pem

input {
    # 输入插件支持哪些,可参考https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html
    # 选择一个即可
    file {
      path => ["/logs/app1/app1.log", "/logs/app2/app3.log", "/logs/httpd/*"]
      # 从文件开头就获取
     start_position => "beginning"
     exclude => "*.zip"
  }
#  beats {
#    port => 5044
#    codec => json {
#      charset => "UTF-8"
#    }
#  }
#   jdbc {
#    jdbc_driver_library => "/path/to/your/jdbc/driver"
#    jdbc_driver_class => "com.mysql.jdbc.Driver"
#    jdbc_connection_string => "jdbc:mysql://localhost:3306/yourdatabase"
#    jdbc_user => "yourusername"
#    jdbc_password => "yourpassword"
#  }
#   kafka {
#    bootstrap_servers => "localhost:9092"
#    topics => ["your_topic"]
#  }
}

filter {  
    # grok 过滤器用于解析非结构化的日志数据,将其转换为结构化的数据
    # 效果就是输出中添加了一个字段message,按指定格式来的
    # 如果就想看 日志中的原格式,这个就不用加。比如java项目的日志,格式就挺好的
  #grok {
    #   match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{DATA:thread}\] %{LOGLEVEL:loglevel} %{DATA:logger} - %{GREEDYDATA:message}" }
  #}
    # date 过滤器用于解析日期和时间信息,将其转换为 Logstash 的 @timestamp 字段
  #date {
  #  match => ["timestamp", "yyyy-MM-dd HH:mm:ss.SSS"]
    #target => "@timestamp"
    #   locale => "cn"
  #}
    # 排除一些没用字段,logstash输出的内容中有一些字段,看需不需要
    mutate {
        remove_field => ["@version", "tags"]
    }
}

# 输出支持选项参考官方文档:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html
# 一般都是接elasticsearch
output {
    # elasticsearch的配置项参考这里:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html#plugins-outputs-elasticsearch-options
    # 有的配置项可能会改名字,自己进去确认
        # elasticsearch 2种方式:'file文件'、'filebeat'。用一个,另一个注释掉
    # 直接从file文件读取时
 elasticsearch { 
        hosts => ["https://es01:9200"]
        index => "test-%{+YYYY.MM.dd}" 
        user => "elastic"
        password => "123456"
        ssl_enabled => true
        ssl_certificate_authorities => "config/certs/ca/ca.pem"
    }  
  # 从filebeats获取数据,'tags'是filebeat传过来数据中的一个字段,在filebeat中配置
  if "test-01-server" in [tags] {
        elasticsearch { 
        hosts => ["https://es01:9200"]
        index => "test-01-server-%{+YYYY.MM.dd}" 
        user => "elastic"
        password => "123456"
        ssl_enabled => true
        ssl_certificate_authorities => "config/certs/ca/ca.pem"
    }  
  }
   if "test-02-server" in [tags] {
        elasticsearch { 
        hosts => ["https://es01:9200"]
        index => "test-02-server-%{+YYYY.MM.dd}" 
        user => "elastic"
        password => "123456"
        ssl_enabled => true
        ssl_certificate_authorities => "config/certs/ca/ca.pem"
    }  
  }
  # 输出到启动logstash的shell页面 
  stdout { 
    codec => rubydebug 
  }
}
3.2.2 logstash.yml

没有配置什么,可以按需设置

#监听地址
http.host: "0.0.0.0"
#节点名称,${index}是脚本变量,两个节点,1和2
#node.name: logstash-node-${index} 

#开启logstash指标监测,将指标数据发送到es集群
#xpack.monitoring.enabled: true  
#xpack.monitoring.elasticsearch.hosts: ["https://172.18.0.11:9200"]
#xpack.monitoring.elasticsearch.username: "elastic"
#xpack.monitoring.elasticsearch.password: "123456"
#xpack.monitoring.elasticsearch.ssl.certificate_authority: "config/certs/ca.pem"

#设置output或filter插件的工作线程数
#pipeline.workers: 8 
#设置批量执行event的最大值
#pipeline.batch.size: 125 
#批处理的最大等待值
#pipeline.batch.delay: 5000 
#开启持久化
#queue.type: persisted 
#队列存储路径;如果队列类型为persisted,则生效
#path.queue: /usr/share/logstash/data 
#队列为持久化,单个队列大小
#queue.page_capacity: 250mb 
#当启用持久化队列时,队列中未读事件的最大数量,0为不限制
#queue.max_events: 0 
#队列最大容量
#queue.max_bytes: 1024mb 
#在启用持久队列时强制执行检查点的最大数量,0为不限制
#queue.checkpoint.acks: 1024 
#在启用持久队列时强制执行检查点之前的最大数量的写入事件,0为不限制
#queue.checkpoint.writes: 1024
#当启用持久队列时,在头页面上强制一个检查点的时间间隔 
#queue.checkpoint.interval: 1000 
3.3.3 logstash.conf(API Key

注意:很不幸,logstash启动不成功,因为连接不上elasticsearch,证书文件打不开,但是kibana和metricbeat连的时候都是用的这个文件啊!查看官方文档,上面写着支持 .cer.pem文件,但是当初elasticsearch这个吊毛给我们生成的是.crt证书,没有.pem。我也不会转,只怪自己对证书概念了解的不够,客户端和服务端不一样吧。真是坑啊!!!
所以我们输出连接elasticsearch的时候换个方式,用API Key试试。

input {
    # 输入插件支持哪些,可参考https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html
    # 选择一个即可
    file {
    path => ["/logs/app1/app1.log", "/logs/app2/app3.log", "/logs/httpd/*"]
        exclude => "*.zip"
  }
}

filter {  
    # grok 过滤器用于解析非结构化的日志数据,将其转换为结构化的数据
  grok {
    match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]
  }
    # date 过滤器用于解析日期和时间信息,将其转换为 Logstash 的 @timestamp 字段
  date {
    match => ["logdate", "yyyy-MM-dd HH:mm:ss.SSS"]
    target => "@timestamp"
  }
}

# 输出支持选项参考官方文档:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html
# 一般都是接elasticsearch
output {
    # elasticsearch的配置项参考这里:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html#plugins-outputs-elasticsearch-options
    # 有的配置项可能会改名字,自己进去确认
    elasticsearch { 
        hosts => ["https://es01:9200"]
        index => "test-%{+YYYY.MM.dd}" 
        ssl_enabled => true
        ssl_verification_mode => full
        api_key => "apiId(自己的):SWtoZ1RwRUIyODlSYU8zX056Sy06TDZBTHF0NHhSckdCQXFaRFNyMDBlQQ=="
    }  
  # 输出到启动logstash的shell页面 
  stdout { 
    codec => rubydebug 
  }
}

好吧,又失败了。容器是起来了,但是连接elasticsearch连不上,还是一直找证书。再换使用ca_trusted_fingerprint连接还是报错:PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
没有办法,百度了一下,crt格式后缀可以直接改成pem后缀使用,我就改了,按第一种证书的连接就可以了。

logstash success

4. 查看日志

当logstash启动成功以后,会把日志文件写到elasticsearch中,但是此时,我们没法直接在kibana中查看内容。需要将数据在kibana中做一个数据视图,然后才能看数据。

目录

创建视图

然后就可以在discover中查看这个索引了。
image.png

我们排除了2个字段,不排除接收到的数据原来是这样的:

{
  "@version" => "1",
  "tags" => [
            [0] "_grokparsefailure"
        ],
  "log" => {
         "file" => {
                 "path" => "/logs/test/test.log"
            }
    },
  "@timestamp" => 2024-08-14T09:02:24.211705249Z,
  "event" => {
      "original" => "2024-07-27 17:27:54.453 [http-nio-9001-exec-6] INFO  org.springdoc.api.AbstractOpenApiResource.getOpenApi(390) -- Init duration for springdoc-openapi is: 79 ms\r"
   },
  "host" => {
      "name" => "d8a03914b903"
    }
 }

5. 将logstash添加到metricbeat监控

这个官网给了的,直接参考拿来用就行。

    1. logstash.yml中将自身监控关闭,这个上面已经配置了
monitoring.enabled: false
    1. 在metricbeat的配置文件中加上logstash模块
  - module: logstash
    metricsets:
      - node
      - node_stats
    period: 10s
    hosts: ["http://172.18.0.16:9600"]
    xpack.enabled: true
    1. 官网建议关闭metricbeat的系统监控
      system监控默认是开启的,可以选择关闭。官网说:除非你有其他目的。我先不不管这个了。
    1. 重启metricbeat,等2分钟,进kibana那会自动添加logstash监控应该能看到。


      image.png

      image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,240评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,328评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,182评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,121评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,135评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,093评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,013评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,854评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,295评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,513评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,398评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,989评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,636评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,657评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352

推荐阅读更多精彩内容