ELK三套件简单介绍
在这种架构中,只有一个 Logstash、Elasticsearch 和 Kibana 实例。Logstash 通过输入插件从多种数据源(比如日志文件、标准输入 Stdin 等)获取数据,再经过滤插件加工数据,然后经 Elasticsearch 输出插件输出到 Elasticsearch,通过 Kibana 展示
总的来说:
Logstash: 负责数据收集
ElastcSearch: 负责数据存储
Kibana: 负责数据查询展示
Logstash 可独立存在,作为数据梳理的中心。数据可以输出到除了ES之外的大多数数据仓库
环境搭建
- 下载安装包
curl https://artifacts.elastic.co/downloads/logstash/logstash-8.3.3-linux-x86_64.tar.gz
- 安装(linux)
tar zxvf logstash-8.3.3-linux-x86_64.tar.gz
-
运行
3.1 调试模式
调试模式,在开发过程中非常有用,可以方便调试和验证插件的正确性
首先需要关闭配置自动刷新功能,否则启动会失败
打开配置文件vim logstash/config/logstash.yml# config.reload.automatic: true # config.reload.interval: 3600s启动调试模式
启动调试模式
# 在logstash安装目录下执行 /bin/logstash -e
3.2 生产模式
生成环境中,往往组要自定义数据保存路径;日志输出路径等等自定义参数
# 参数也可以在配置文件中定义 /bin/logstash --path.data /var/data --path.logs /var/logs
-
开发插件
Logstash事件的处理管线是inputs → filters → outputs,三个阶段都可以自定义插件,本文主要介绍如何开发input插件4.1 cd到Logstash的安装目录,使用bin/logstash-plugin生成filter插件模板,如下:
bin/logstash-plugin generate --type code --name logstash-codec --logstash-codec-netstream
4.2. 代码目录说明
目录结构说明:
目录 | 说明 |
---|---|
doc | 插件文档目录 |
lib | 插件代码目录 |
spec | 插件测试代码目录 |
Gemfile | gem管理文件 |
logstash-codec-netstream | 插件描述文件 |
4.3 插件核心代码
logstash插件多为Ruby编写,各类型插件代码的默认目录结构变化不大。默认插件目录在logstash/vendor/bundle/jruby/2.5.0/gems/logstash-*
例如,本插件代码路径为:logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-netstream/lib/logstash/codecs/netstream.ruby,代码结构如下:
# encoding: utf-8
require "logstash/codecs/base"
require "logstash/namespace"
# This filter will replace the contents of the default
# message field with whatever you specify in the configuration.
#
# It is only intended to be used as an .
class LogStash::Filters::Netstream < LogStash::Filters::Base
# Setting the config_name here is required. This is how you
# configure this filter from your Logstash config.
#
# filter {
# {
# message => "My message..."
# }
# }
#
config_name "netstream"
# Replace the message with this value.
config :message, :validate => :string, :default => "Hello World!"
def register
# Add instance variables
end # def register
def decode(event)
events = []
return events
end # def filter
end # class LogStash::Filters::Netstream
插件编写请参考Logstash官网说明:
https://www.elastic.co/guide/en/logstash/current/deploying-and-scaling.html
4.4 插件编译
通过命令行gem或rubymine均可编译已经写好的插件
4.4.1 gem编译
root> gem build
WARNING: license value 'Apache License (2.0)' is invalid. Use a license identifier from
http://spdx.org/licenses or 'Nonstandard' for a nonstandard license.
Did you mean 'Apache-2.0'?
WARNING: open-ended dependency on bindata (>= 1.5.0) is not recommended
if bindata is semantically versioned, use:
WARNING: See https://guides.rubygems.org/specification-reference/ for help
Successfully built RubyGem
Name: logstash-codec-netstream
Version: 1.1.1
File: logstash-codec-netstream-1.1.1.gem
4.4.2 rubymine编译
编译完成后生成xxxx.gem
的插件表示成功编译
-
插件安装与调试
5.1 插件打包
将生成的
xxxx.gem
压缩成xxxx.gem.zip文件,上传.zip文件至安装环境任意目录5.2 自定义插件安装
进入到logstash安装目录,执行如下命令
logstash/bin/logstash-plugin install file:///env/logstash/logstash-codec-netstream-1.1.1.gem.zip
5.2 其他方式安装
#在线安装(安装地址:Rubygems) bin/logstash-plugin install logstash-output-kafka #离线gem安装(此方法验证不成功) bin/logstash-plugin install /path/to/logstash-output-kafka-1.0.0.gem #运行本地插件 bin/logstash --path.plugins /opt/shared/lib #升级本地插件 bin/logstash-plugin update logstash-output-kafka
5.3 插件管理
对于插件,官方提供了很多管理命令
#列举已经安装的插件bin/logstash-plugin list#列举已经安装的插件和其版本信息 bin/logstash-plugin list --verbose #通过名称筛选插件 bin/logstash-plugin list '*namefragment*' #根据插件类型筛选插件,对应input|filter|codec|output bin/logstash-plugin list --group output
5.4 插件调试
插件安装后就可以在logstash的配置文件使用,使用插件代码中定义的
config_name
名字编写配置文件配置文件示例:
input { # Netstream udp { id => "input-udp-netstream" host => "0.0.0.0" port => "2057" workers => "4" queue_size => "2048}" receive_buffer_bytes => "33554432" codec => netstream { versions => [5,9,10] include_flowset_id => "true" netstream_definitions => "/etc/logstash/elastiflow/definitions}/netflow.yml" ipfix_definitions => "/etc/logstash/elastiflow/definitions}/ipfix.yml" } type => "netstream" } }
其中,
netstream
便是我们定义的插件重启logstash
但是,有时候logstash重启会花费太多时间,对于代码调试效率太低,可以通过logstash 提供的调试模式进行验证代码的合理性,调试codec插件:
bin/logstash -e 'input{ udp { port => "2057" codec => netstream { versions => [5,9,10] include_flowset_id => "true" } } } output{stdout { codec => rubydebug }}'
对于其他类型的插件这种方法也适用,调试filter插件中镶嵌的ruby代码
ruby代码:
require "redis" def register(params) @host = params["host"] @port = params["port"] @password = params["password"] @db = params["db"] @tll = params["tll"] @redis=Redis.new(host:@host, port: @port, username: @username, password: @password, db:@db ) end # the filter method receives an event and must return a list of events. # Dropping an event means not including it in the return array, # while creating new ones only requires you to add a new instance of # LogStash::Event to the returned array def filter(event) puts "======================================================" key = 'logstash-metrics-keys:'+"#{event.get('[host][ip]')}#{event.get('@timestamp').to_f.round(3)*1000}" @redis.set(key,1,{"ex"=> 5}) puts "====================================================" return [] end test "drop percentage 100%" do parameters do { "host" => "127.0.0.1","port"=>6379,"password"=>"crdigitalmnet","db"=>0 } end in_event { { "message" => "hello","host"=>{"ip"=>"1.1.1.1"} } } expect("drops the event") do |events| events.size == 0 end end
调试命令:
bin/logstash -e "filter { ruby { path => '/evn/logstash/events_metrics.rb' script_params => { 'drop_percentage' => 0.5 } } }" -t
5.5 开启调试日志
对于运行中的logstash可以通过提供的API接口开启logstash的调试日志,并且支持单独模块的日志开启,这对于插件开发非常有用。
可以修改
log4j2.properties
文件并重新启动你的Logstash,但这既乏味又会导致不必要的停机,相反,你可以通过日志记录API动态更新日志记录级别,这些设置立即生效,不需要重新启动。默认情况下,日志API试图绑定到
tcp:9600
,如果该端口已经被另一个Logstash实例使用,你需要使用--http.port
标志启动Logstash指定绑定到另一个端口,有关更多信息,请参阅命令行标志。要更新日志级别,请使用你感兴趣的子系统/模块并预先设置
logger.
,例如:curl -XPUT 'localhost:9600/_node/logging?pretty' -H 'Content-Type: application/json' -d' { "logger.logstash.outputs.elasticsearch" : "DEBUG" } '
当这个设置生效时,Logstash为配置中指定的所有Elasticsearch输出发出DEBUG级别日志,请注意,这个新设置是暂时的,不会在重新启动时存活。
应该向
log4j2.properties
添加持久性更改,例如:logger.elasticsearchoutput.name = logstash.outputs.elasticsearch logger.elasticsearchoutput.level = debug
查看日志开启情况,可通过_node/logging接口查看:
curl -XGET 'localhost:9600/_node/logging?pretty'
响应示例:
{ "host" : "4-17", "version" : "8.3.2", "http_address" : "127.0.0.1:9600", "id" : "de9b8144-0108-4d6f-8167-e06fb1b6b0cb", "name" : "4-17", "ephemeral_id" : "56649e83-ef61-4749-a057-fa4020604671", "status" : "green", "snapshot" : false, "pipeline" : { "workers" : 16, "batch_size" : 125, "batch_delay" : 50 }, "loggers" : { "com.rabbitmq.client.Address" : "INFO", "com.rabbitmq.client.ConnectionFactory" : "INFO", "deprecation.logstash.codecs.netflow" : "WARN", "deprecation.logstash.codecs.netstream" : "WARN", "deprecation.logstash.codecs.plain" : "WARN", "deprecation.logstash.codecs.sflow" : "WARN", "deprecation.logstash.filters.cidr" : "WARN", "deprecation.logstash.filters.date" : "WARN", "deprecation.logstash.filters.dns" : "WARN", "deprecation.logstash.filters.drop" : "WARN", "deprecation.logstash.filters.geoip" : "WARN", "deprecation.logstash.filters.mutate" : "WARN", "deprecation.logstash.filters.ruby" : "WARN", "deprecation.logstash.filters.translate" : "WARN", "deprecation.logstash.inputs.tcp" : "WARN", "deprecation.logstash.inputs.udp" : "WARN", "deprecation.logstash.outputs.elasticsearch" : "WARN", "io.netty.bootstrap.ServerBootstrap" : "INFO", "io.netty.buffer.ByteBufUtil" : "INFO", "io.netty.buffer.PoolThreadCache" : "INFO", "io.netty.buffer.PooledByteBufAllocator" : "INFO", "io.netty.channel.AbstractChannel" : "INFO", "io.netty.channel.AbstractChannelHandlerContext" : "INFO", "io.netty.channel.ChannelHandlerMask" : "INFO", } }
要重置任何可能通过日志API动态更改的日志级别,请向
_node/logging/reset
发送PUT
请求,所有日志级别都将恢复到log4j2.properties
文件中指定的值。curl -XPUT 'localhost:9600/_node/logging/reset?pretty'
FAQ
- logstash 插件开发中依赖第三方gem库,如何安装依赖gem?
gem install logstash-core -v 5.6.4
gem install logstash-core-plugin-api
gem install logstash-mixin-event_support
-
logstash插件开发中如何修改数据中的
@timestamp
的值?默认情况下,logstash使用系统收到数据的时间作为
@timestamp
的值,此值是UTC时间。修改方法有两种其一,通过配置文件修改,在logstash的pipline的配置文件的input、filter、codes任意流程中加入以下配置
# 在input配置文件中增加如下代码 ruby { code => " event.set('timestamp',event.timestamp.time.localtime + 8*60*60) event.set('@timestamp',event.get('timestamp')) " }
其二、在自定义插件中通过代码修改
LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(unix_nsec, unix_nsec / 1000),
unix_nsec 是unix格式的时间戳
-
logstash的监控功能如何开启?
logstash 资源占用、数据解析过程可通过官方提供的metricbeat进行监控
3.1 安装过程参考:
3.2 MetricBeat简单使用
查看监控模块
metricbeat modules list
开启监控模块
metricbeat modules enable logstash
重启服务
sudo service metricbeat start
-
logstash 没有收到数据,日志中也没有数据接收记录如何排查?
以UDP接收为例,先看一下从网卡到应用程序之间数据处理流程
4.1 排查网卡是否丢包
# 在输出中查找 bad 或者 drop 对应的字段是否有数据,在正常情况下,这些字段对应的数字应该都是 0 ethtool -S eth0
4.2 抓包查看源地址/目的地址/mac地址是否正确
# 控制台抓包 tcpdump -vvv -i ens192 udp port 2057 -e # 抓包内容保存到文件,可用wireshark分析 tcpdump -vvv -i ens192 udp port 2057 -w netstream.pcap
4.3 查看防火墙规则
iptables -L
4.5 查看logstash日志
tailf /var/log/logstash/logstash.log
-
是否可以通过模拟UDP数据包测试logstash解析过程?
通过tcprewrite工具可以对tcpdump抓到的包进行重放,模拟udp数据发送.
tcprewrite回放不能在同一个机器进行回放,操作系统会丢弃数据包
5.1 安装tcprewrite
wget https://download-ib01.fedoraproject.org/pub/epel/7/x86_64/Packages/t/ rpm -Uvh epel-release*rpm yum install tcpreplay
5.2 修改源/目的地址
tcprewrite --srcipmap=10.121.215.248/32:10.121.50.146/32 -i http_03.pcapng -o http_03.pcap
5.3 回放数据包
# 注意:tcpreplay 不能在同一个机器进行回放,会被操作系统丢弃数据包 tcpreplay -i ens192 -p 1 netstream.pacp
5.4 关闭反向路由校验(非必须)
如果在不同机器之间进行UDP数据回放仍然无法收到数据,考虑操作系统是否开启了反向路由校验
# 查看内核参数 sysctl -a|grep rp_filter # 临时修改内核参数 sysctl net.ipv4.conf.all.rp_filter=0 sysctl net.ipv4.conf.default.rp_filter
-
ES中有数据,但logstash中查询不到?
Kibana时间和日志时间有差异无法获取时间区间数据。查看Kibana时区是否正确,更改Kibana查询时间
xxx