1.elk的搭建
2.kafka的配置以及启动
3.修改logstash配置文件
cd /usr/share/logstash/conf
sudo vim logstash.conf
input{
kafka {
bootstrap_servers => "localhost:9092"
topics => ["topic_log_info"]
auto_offset_reset => "latest"
consumer_threads => 1
codec => "plain"
type => "info"
}
kafka {
bootstrap_servers => "localhost:9092"
topics => ["topic_log_error"]
auto_offset_reset => "latest"
consumer_threads => 1
codec => "plain"
type => "error"
}
}
filter {
if ([message]== "")
{
drop {}
}
}
output {
if [type] == "error" {
elasticsearch {
hosts => [ "192.168.1.200:9200"]
index => "logstash-error"
}
}
if [type] == "info" {
elasticsearch {
hosts => [ "192.168.1.200:9200"]
index => "logstash-info"
}
}
stdout{
codec => rubydebug
}
}
保存退出
4.切换到es用户 su es
5.查看java -version 是否正确 不正确则执行
source /etc/profile
6.重新运行logstash
cd /usr/share/logstash
./bin/logstash -f conf/logstash.conf
7.测试
以python为例写了一个kafka发送测试消息