环境:flume-1.6,kafka_2.11-0.9.0.0。完成将/usr/local/nohup.out下的日志实时输出到kafka中,便于后续处理,包括将日志归档到hdfs,以及与storm,spark等集成,进行实时日志的分析。
1. 配置kafka:
1)server.properties
host.name=localhost, log.dir=/usr/local/kafka-logs
2)zookeeper.properties
dataDir=/usr/local/zookeeper/data
3)zookeeper启动
bin/zookeeper-server-start.sh config/zookeeper.properties
2. 启动kafka:
1)启动server: bin/kafka-server-start.sh config/server.properties
bin/kafka-topics.sh--create--zookeeper localhost:2181--replication1--partition1--topic test
2)启动一个consumer: bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
3. 配置flume:
1)flume-conf.properties
#agent section
producer.sources = s
producer.channels = c
producer.sinks = r
#source section
producer.sources.s.type = exec
producer.sources.s.command = tail -F /usr/local/nohup.out
producer.sources.s.channels = c
# Each sink's type must be defined
producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
producer.sinks.r.custom.topic.name = test
producer.sinks.r.metadata.broker.list = 127.0.0.1:9092
producer.sinks.r.partition.key = 0
producer.sinks.r.partitioner.class = org.apache.flume.plugins.SinglePartition
producer.sinks.r.serializer.class = kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks = 1
producer.sinks.r.max.message.size = 1000000
#Specify the channel the sink should use
producer.sinks.r.channel = c
# Each channel's type is defined.
producer.channels.c.type = memory
producer.channels.c.capacity = 1000
4. 启动flume:
1) bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer -Dflume.root.logger=INFO,console