Logstash插件开发入门

ELK三套件简单介绍

在这种架构中,只有一个 Logstash、Elasticsearch 和 Kibana 实例。Logstash 通过输入插件从多种数据源(比如日志文件、标准输入 Stdin 等)获取数据,再经过滤插件加工数据,然后经 Elasticsearch 输出插件输出到 Elasticsearch,通过 Kibana 展示

总的来说:

Logstash: 负责数据收集
ElastcSearch: 负责数据存储
Kibana: 负责数据查询展示

Logstash 可独立存在,作为数据梳理的中心。数据可以输出到除了ES之外的大多数数据仓库

image.png
环境搭建
  1. 下载安装包
curl https://artifacts.elastic.co/downloads/logstash/logstash-8.3.3-linux-x86_64.tar.gz
  1. 安装(linux)
tar zxvf logstash-8.3.3-linux-x86_64.tar.gz
  1. 运行

    3.1 调试模式
    调试模式,在开发过程中非常有用,可以方便调试和验证插件的正确性
    首先需要关闭配置自动刷新功能,否则启动会失败
    打开配置文件vim logstash/config/logstash.yml

    # config.reload.automatic: true
    # config.reload.interval: 3600s启动调试模式
    

    启动调试模式

    # 在logstash安装目录下执行
    /bin/logstash -e 
    

    3.2 生产模式

    生成环境中,往往组要自定义数据保存路径;日志输出路径等等自定义参数

    # 参数也可以在配置文件中定义
    /bin/logstash --path.data /var/data  --path.logs /var/logs
    
  1. 开发插件
    Logstash事件的处理管线是inputs → filters → outputs,三个阶段都可以自定义插件,本文主要介绍如何开发input插件

    4.1 cd到Logstash的安装目录,使用bin/logstash-plugin生成filter插件模板,如下:

    bin/logstash-plugin generate --type code --name logstash-codec --logstash-codec-netstream   
    

    4.2. 代码目录说明

    目录结构

目录结构说明:

目录 说明
doc 插件文档目录
lib 插件代码目录
spec 插件测试代码目录
Gemfile gem管理文件
logstash-codec-netstream 插件描述文件

4.3 插件核心代码

logstash插件多为Ruby编写,各类型插件代码的默认目录结构变化不大。默认插件目录在logstash/vendor/bundle/jruby/2.5.0/gems/logstash-*

例如,本插件代码路径为:logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-netstream/lib/logstash/codecs/netstream.ruby,代码结构如下:

# encoding: utf-8
 require "logstash/codecs/base"
 require "logstash/namespace"

 # This  filter will replace the contents of the default
 # message field with whatever you specify in the configuration.
 #
 # It is only intended to be used as an .
 class LogStash::Filters::Netstream < LogStash::Filters::Base

    # Setting the config_name here is required. This is how you
    # configure this filter from your Logstash config.
    #
    # filter {
    #    {
    #     message => "My message..."
    #   }
    # }
    #
    config_name "netstream"

    # Replace the message with this value.
    config :message, :validate => :string, :default => "Hello World!"

    def register
       # Add instance variables
    end # def register

    def decode(event)
       events = []
       return events
    end # def filter
 end # class LogStash::Filters::Netstream 

插件编写请参考Logstash官网说明:

https://www.elastic.co/guide/en/logstash/current/deploying-and-scaling.html

4.4 插件编译

通过命令行gem或rubymine均可编译已经写好的插件

4.4.1 gem编译

root> gem build
WARNING:  license value 'Apache License (2.0)' is invalid.  Use a license identifier from
http://spdx.org/licenses or 'Nonstandard' for a nonstandard license.
Did you mean 'Apache-2.0'?
WARNING:  open-ended dependency on bindata (>= 1.5.0) is not recommended
  if bindata is semantically versioned, use:
WARNING:  See https://guides.rubygems.org/specification-reference/ for help
  Successfully built RubyGem
  Name: logstash-codec-netstream
  Version: 1.1.1
  File: logstash-codec-netstream-1.1.1.gem

4.4.2 rubymine编译

rubymine编译插件

编译完成后生成xxxx.gem的插件表示成功编译

  1. 插件安装与调试

    5.1 插件打包

    将生成的xxxx.gem 压缩成xxxx.gem.zip文件,上传.zip文件至安装环境任意目录

    5.2 自定义插件安装

    进入到logstash安装目录,执行如下命令

    logstash/bin/logstash-plugin install file:///env/logstash/logstash-codec-netstream-1.1.1.gem.zip
    

    5.2 其他方式安装

    #在线安装(安装地址:Rubygems)
    bin/logstash-plugin install logstash-output-kafka
    #离线gem安装(此方法验证不成功)
    bin/logstash-plugin install /path/to/logstash-output-kafka-1.0.0.gem
    #运行本地插件
    bin/logstash --path.plugins /opt/shared/lib
    #升级本地插件
    bin/logstash-plugin update logstash-output-kafka
    

    5.3 插件管理

    对于插件,官方提供了很多管理命令

    #列举已经安装的插件bin/logstash-plugin list#列举已经安装的插件和其版本信息
    bin/logstash-plugin list --verbose #通过名称筛选插件
    bin/logstash-plugin list '*namefragment*' #根据插件类型筛选插件,对应input|filter|codec|output
    bin/logstash-plugin list --group output
    

    5.4 插件调试

    插件安装后就可以在logstash的配置文件使用,使用插件代码中定义的config_name 名字编写配置文件

    配置文件示例:

    input {
      # Netstream
      udp {
        id => "input-udp-netstream"
        host => "0.0.0.0"
        port => "2057"
        workers => "4"
        queue_size => "2048}"
        receive_buffer_bytes => "33554432"
        codec => netstream {
          versions => [5,9,10]
          include_flowset_id => "true"
          netstream_definitions => "/etc/logstash/elastiflow/definitions}/netflow.yml"
          ipfix_definitions => "/etc/logstash/elastiflow/definitions}/ipfix.yml"
        }
        type => "netstream"
      }
    }
    
    

    其中,netstream 便是我们定义的插件

    重启logstash

    但是,有时候logstash重启会花费太多时间,对于代码调试效率太低,可以通过logstash 提供的调试模式进行验证代码的合理性,调试codec插件:

    bin/logstash -e 'input{ udp { port => "2057" codec => netstream { versions => [5,9,10] include_flowset_id => "true" } } } output{stdout { codec => rubydebug }}'
    

    对于其他类型的插件这种方法也适用,调试filter插件中镶嵌的ruby代码

    ruby代码:

    require "redis"
    
    def register(params)
      @host = params["host"]
      @port = params["port"]
      @password = params["password"]
      @db = params["db"]
      @tll = params["tll"]
    
      @redis=Redis.new(host:@host, port: @port, username: @username, password: @password, db:@db )
    end
    
    # the filter method receives an event and must return a list of events.
    # Dropping an event means not including it in the return array,
    # while creating new ones only requires you to add a new instance of
    # LogStash::Event to the returned array
    def filter(event)
      puts "======================================================"
      key = 'logstash-metrics-keys:'+"#{event.get('[host][ip]')}#{event.get('@timestamp').to_f.round(3)*1000}"
      @redis.set(key,1,{"ex"=> 5})
      puts "===================================================="
      return []
    end
    
    test "drop percentage 100%" do
      parameters do
        { "host" => "127.0.0.1","port"=>6379,"password"=>"crdigitalmnet","db"=>0 }
      end
    
      in_event { { "message" => "hello","host"=>{"ip"=>"1.1.1.1"} } }
    
      expect("drops the event") do |events|
        events.size == 0
      end
    end
    

    调试命令:

    bin/logstash -e "filter { ruby { path => '/evn/logstash/events_metrics.rb' script_params => { 'drop_percentage' => 0.5 } } }" -t
    

    5.5 开启调试日志

    对于运行中的logstash可以通过提供的API接口开启logstash的调试日志,并且支持单独模块的日志开启,这对于插件开发非常有用。

    可以修改log4j2.properties文件并重新启动你的Logstash,但这既乏味又会导致不必要的停机,相反,你可以通过日志记录API动态更新日志记录级别,这些设置立即生效,不需要重新启动。

    默认情况下,日志API试图绑定到tcp:9600,如果该端口已经被另一个Logstash实例使用,你需要使用--http.port标志启动Logstash指定绑定到另一个端口,有关更多信息,请参阅命令行标志。

    要更新日志级别,请使用你感兴趣的子系统/模块并预先设置logger.,例如:

    curl -XPUT 'localhost:9600/_node/logging?pretty' -H 'Content-Type: application/json' -d'
    {
        "logger.logstash.outputs.elasticsearch" : "DEBUG"
    }
    '
    

    当这个设置生效时,Logstash为配置中指定的所有Elasticsearch输出发出DEBUG级别日志,请注意,这个新设置是暂时的,不会在重新启动时存活。

    应该向log4j2.properties添加持久性更改,例如:

    logger.elasticsearchoutput.name = logstash.outputs.elasticsearch
    logger.elasticsearchoutput.level = debug
    

    查看日志开启情况,可通过_node/logging接口查看:

    curl -XGET 'localhost:9600/_node/logging?pretty'
    

    响应示例:

    {
      "host" : "4-17",
      "version" : "8.3.2",
      "http_address" : "127.0.0.1:9600",
      "id" : "de9b8144-0108-4d6f-8167-e06fb1b6b0cb",
      "name" : "4-17",
      "ephemeral_id" : "56649e83-ef61-4749-a057-fa4020604671",
      "status" : "green",
      "snapshot" : false,
      "pipeline" : {
        "workers" : 16,
        "batch_size" : 125,
        "batch_delay" : 50
      },
      "loggers" : {
        "com.rabbitmq.client.Address" : "INFO",
        "com.rabbitmq.client.ConnectionFactory" : "INFO",
        "deprecation.logstash.codecs.netflow" : "WARN",
        "deprecation.logstash.codecs.netstream" : "WARN",
        "deprecation.logstash.codecs.plain" : "WARN",
        "deprecation.logstash.codecs.sflow" : "WARN",
        "deprecation.logstash.filters.cidr" : "WARN",
        "deprecation.logstash.filters.date" : "WARN",
        "deprecation.logstash.filters.dns" : "WARN",
        "deprecation.logstash.filters.drop" : "WARN",
        "deprecation.logstash.filters.geoip" : "WARN",
        "deprecation.logstash.filters.mutate" : "WARN",
        "deprecation.logstash.filters.ruby" : "WARN",
        "deprecation.logstash.filters.translate" : "WARN",
        "deprecation.logstash.inputs.tcp" : "WARN",
        "deprecation.logstash.inputs.udp" : "WARN",
        "deprecation.logstash.outputs.elasticsearch" : "WARN",
        "io.netty.bootstrap.ServerBootstrap" : "INFO",
        "io.netty.buffer.ByteBufUtil" : "INFO",
        "io.netty.buffer.PoolThreadCache" : "INFO",
        "io.netty.buffer.PooledByteBufAllocator" : "INFO",
        "io.netty.channel.AbstractChannel" : "INFO",
        "io.netty.channel.AbstractChannelHandlerContext" : "INFO",
        "io.netty.channel.ChannelHandlerMask" : "INFO",
     }
    }
    

    要重置任何可能通过日志API动态更改的日志级别,请向_node/logging/reset发送PUT请求,所有日志级别都将恢复到log4j2.properties文件中指定的值。

    curl -XPUT 'localhost:9600/_node/logging/reset?pretty'
    

FAQ
  1. logstash 插件开发中依赖第三方gem库,如何安装依赖gem?
gem install logstash-core -v 5.6.4
gem install logstash-core-plugin-api
gem install logstash-mixin-event_support
  1. logstash插件开发中如何修改数据中的@timestamp 的值?

    默认情况下,logstash使用系统收到数据的时间作为@timestamp 的值,此值是UTC时间。修改方法有两种

    其一,通过配置文件修改,在logstash的pipline的配置文件的input、filter、codes任意流程中加入以下配置

    # 在input配置文件中增加如下代码
       ruby {
            code => "
                    event.set('timestamp',event.timestamp.time.localtime + 8*60*60)
                    event.set('@timestamp',event.get('timestamp'))
                    "
        }
    

    其二、在自定义插件中通过代码修改

    LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(unix_nsec, unix_nsec / 1000),
    

    unix_nsec 是unix格式的时间戳

  1. logstash的监控功能如何开启?

    logstash 资源占用、数据解析过程可通过官方提供的metricbeat进行监控

    3.1 安装过程参考:

    MetricBeat安装过程

    MetricBeat监控logstash

    3.2 MetricBeat简单使用

    查看监控模块

    metricbeat modules list
    

    开启监控模块

    metricbeat modules enable logstash
    

    重启服务

    sudo service metricbeat start
    
  1. logstash 没有收到数据,日志中也没有数据接收记录如何排查?

    以UDP接收为例,先看一下从网卡到应用程序之间数据处理流程

    网络数据处理流程

    4.1 排查网卡是否丢包

    # 在输出中查找 bad 或者 drop 对应的字段是否有数据,在正常情况下,这些字段对应的数字应该都是 0
    ethtool -S eth0
    

    4.2 抓包查看源地址/目的地址/mac地址是否正确

    # 控制台抓包
    tcpdump -vvv -i ens192 udp port 2057 -e
    # 抓包内容保存到文件,可用wireshark分析
    tcpdump -vvv -i ens192 udp port 2057 -w netstream.pcap
    

    4.3 查看防火墙规则

     iptables -L
    

    4.5 查看logstash日志

    tailf /var/log/logstash/logstash.log
    
  1. 是否可以通过模拟UDP数据包测试logstash解析过程?

    通过tcprewrite工具可以对tcpdump抓到的包进行重放,模拟udp数据发送.

    tcprewrite回放不能在同一个机器进行回放,操作系统会丢弃数据包

    5.1 安装tcprewrite

    wget https://download-ib01.fedoraproject.org/pub/epel/7/x86_64/Packages/t/
    rpm -Uvh epel-release*rpm
    yum install tcpreplay
    

    5.2 修改源/目的地址

    tcprewrite  --srcipmap=10.121.215.248/32:10.121.50.146/32 -i http_03.pcapng -o http_03.pcap
    

    5.3 回放数据包

    # 注意:tcpreplay 不能在同一个机器进行回放,会被操作系统丢弃数据包
    
    tcpreplay -i ens192 -p 1 netstream.pacp
    

    5.4 关闭反向路由校验(非必须)

    如果在不同机器之间进行UDP数据回放仍然无法收到数据,考虑操作系统是否开启了反向路由校验

    # 查看内核参数
    sysctl -a|grep rp_filter
    # 临时修改内核参数
    sysctl net.ipv4.conf.all.rp_filter=0
    sysctl net.ipv4.conf.default.rp_filter
    
  2. ES中有数据,但logstash中查询不到?

    Kibana时间和日志时间有差异无法获取时间区间数据。查看Kibana时区是否正确,更改Kibana查询时间

  3. xxx

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,053评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,527评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,779评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,685评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,699评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,609评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,989评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,654评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,890评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,634评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,716评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,394评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,976评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,950评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,191评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,849评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,458评论 2 342

推荐阅读更多精彩内容