测试环境
filebeat从日志中收集,向kafka中输出
logstash从kafka中收集,向elasticsearch中输出
kibana
配置文件config/kibana.yml
server.port: 9102
server.host: "0.0.0.0"
elasticsearch.hosts: "http://localhost:9200"
kibana.index: ".kibana"
后台启动kibana并监听配置文件
nohup sh /usr/local/src/kibana-7.4.0-linux-x86_64/bin/kibana --allow-root > /usr/local/src/logs/kibana.log &
tail -f /usr/local/src/logs/kibana.log
添加权限
chmod +x kibana.sh
elasticsearch
将分词插件下载到plugins目录下
#es不允许root⽤户启动,需要添加新⽤户身份
#创建elsearch⽤户组及elsearch⽤户
groupadd elsearch
useradd elsearch -g elsearch -p elasticsearch
#更改elasticsearch⽂件夹及内部⽂件的所属⽤户及组为elsearch:elsearch
chown -R elsearch:elsearch elasticsearch
#切换到elsearch⽤户再启动
su elsearch
#守护进程运⾏
./bin/elasticsearch -d
#验证启动进程
ps aux | grep elasticsearch
logstash
设置启动文件
logstash.sh
nohup sh /usr/local/src/logstash-7.4.0/bin/logstash -f /usr/local/src/logstash-7.4.0/conf.d/ --config.reload.automatic >> /usr/local/src/logs/logstash.log &
tail -f /usr/local/src/logs/logstash.log
#随意验证
mkdir -p /root/logs/
date >> /root/logs/1.log
chmod +x logstash.sh
配置配置文件(在conf.d下创建config.conf)
input {
# 从文件中传入日志
file {
path => "/root/logs/*.log"
start_position => beginning
add_field => {"from" => "localfile"}
}
kafka {
bootstrap_servers => ["192.168.200.182:9103"]
group_id => "logstash"
topics => ["filebeat"]
consumer_threads => 1
decorate_events => true
add_field => {"from" => "filebeat"}
codec=>"json"
}
kafka {
bootstrap_servers => ["192.168.200.182:9103"]
group_id => "logstash"
topics => ["demo"]
consumer_threads => 1
decorate_events => true
add_field => {"from" => "kafka appender"}
codec=>"json"
}
}
filter {
mutate {
rename => { "[host][name]" => "host" }
}
}
output {
elasticsearch {
hosts => "localhost:9200"
index => "mylog"
}
stdout {
}
}
kafka&zookeeper
安装kafka2.2 并创建一个topic demo
#docker启动
#启动zookeeper
docker run --name zookeeper \
-v /opt/data/zksingle:/data \
-p 2181:2181 \
-e ZOO_LOG4J_PROP="INFO,ROLLINGFILE" \
-d zookeeper:3.4.13
#启动kafka
docker run -d --name kafka \
-p 9103:9092 \
--link zookeeper:zookeeper \
--env KAFKA_BROKER_ID=100 \
--env HOST_IP=192.168.200.182 \
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.182 \
--env KAFKA_ADVERTISED_PORT=9103 \
--restart=always \
--volume /etc/localtime:/etc/localtime \
wurstmeister/kafka:2.12-2.2.2
#使⽤zk节点数据验证启动情况
docker exec -it zookeeper sh
#进⼊zookeeper后查看节点信息
ls /brokers
#进⼊容器
docker exec -it kafka sh
cd /opt/kafka_2.12-2.2.2/bin
#客户端监听(该步会⾃动创建topic)
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --
from-beginning
#另起⼀个终端,验证发送
./kafka-console-producer.sh --broker-list localhost:9092 --topic demo
filebeat
filebeat.yml配置文件
filebeat.inputs:
- type: log
enabled: true
paths:
- /root/logs/*.log
fields:
from: filebeat
output.kafka:
enabled: true
hosts: ["192.168.200.182:9103"]
topic: filebeat
compression: gzip
processors:
- drop_fields:
fields: ["beat", "input", "source",
"offset","metadata","timestamp","agent","ecs","fields"]
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: true
logging.level: info
name: filebeat-server-ip
启动命令
filebeat.sh
# 在filebeat的解压目录
cd /usr/local/src/filebeat-7.4.0-linux-x86_64
nohup ./filebeat -e -c filebeat.yml > /usr/local/src/logs/filebeat.log &
tail -f /usr/local/src/logs/filebeat.log
chmod +x filebear.sh