1.启动ZK
./zkServer.sh start
2.daemon
形式启动Kafka
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
3.创建topic:zk
kafka-topics.sh --create --zookeeper hadoop:2181 --replication-factor 1 --partitions 1 --topic logstash_topic
4.发送消息:broker
kafka-console-producer.sh --broker-list hadoop:9092 --topic logstash_topic
5.消费消息:zk
kafka-console-consumer.sh --zookeeper hadoop:2181 --topic logstash_topic --from-beginning
6.消费消息:zk
kafka-console-consumer.sh --zookeeper hadoop:2181 --topic logstash_topic --from-beginning
7.vi kafka.conf
编辑配置文件
input{
file {
path => "/home/feiyue/app/logstash-2.4.1/test.txt"
}
}
output {
kafka {
topic_id => "logstash_topic"
bootstrap_servers => "hadoop:9092"
batch_size => 1
}
}
8.启动logstash
,bin/logstash -f kafka.conf
9.新开窗口往第7步的test.txt
写入内容,echo "kkkkk" >> test.txt
,在kafka-console-consumer.sh
启动窗口看到json
形式的内容输出
参考:https://www.elastic.co/guide/en/logstash/2.4/plugins-outputs-kafka.html