一、Flume简介
#官网简介
Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data.
#System Requirements
1. Java Runtime Environment - Java 1.8 or later
2. Memory - Sufficient memory for configurations used by sources, channels or sinks
3. Disk Space - Sufficient disk space for configurations used by channels or sinks
4. Directory Permissions - Read/Write permissions for directories used by agent
#综述
Flume是一种分布式,可靠且可用的服务,用于有效地收集,聚合和移动大量日志数据。
Flume运行环境需要Java 1.8以上,有足够的内存和磁盘来支持,而且目录有读写权限。
二、Flume架构
Flume架构主要有三个组件:Source、Channel和Sink。
- When a Flume source receives an event, it stores it into one or more channels.
- The channel is a passive store that keeps the event until it’s consumed by a Flume sink. The file channel is one example – it is backed by the local filesystem.
- The sink removes the event from the channel and puts it into an external repository like HDFS (via Flume HDFS sink) or forwards it to the Flume source of the next Flume agent (next hop) in the flow.
三、Flume example
Flume Source
通过调研,对日志文件的读取可以采用如下三种Source:
Exec Source、Spooling Directory Source和Taildir Source。
- Exec Source可对日志中新增的数据进行监控,但是如果agent挂掉后,不支持断点续传。
- Spooling Directory Source能获取一个目录中新增的文件,但是不能监控单个文件的新增数据。
- Taildir Source在磁盘上产生一个JSON文件,记录日志读取的位置,在agent挂掉重启之后,从记录的位置执行,从而实现断点续传。
Flume Sink
由于我们需要将日志数据实时写入Kafka,所以这里使用Kafka Sink。根据官网文档,type
和kafka.bootstrap.servers
必须设置,且为:org.apache.flume.sink.kafka.KafkaSink
和hostname:port
。
Flume Configuration File
创建一个Flume配置文件file-flume-kafka.conf
。
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/zulin/tomcat7-webgis/logs/localhost_access_log.*.txt
# 元数据位置
a1.sources.r1.positionFile = /home/zulin/taildir_position.json
a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = testflume
a1.sinks.k1.brokerList = 90.0.25.8:9092,90.0.25.9:9092,90.0.25.10:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
由于日志文件以日期命名,如localhost_access_log.2019-10-08.txt,我们可以采用通配符“*
”来代替当天的日期。
Start Flume
- 启动Flume:
bin/flume-ng agent --conf conf --conf-file job/file-flume-kafka.conf --name a1
- 启动Kafka Consumer消费数据:
./kafka-console-consumer.sh --bootstrap-server hadoop08:9092,hadoop09:9092,hadoop10:9092 --topic testflume
以上为启动后的效果,左边为Kafka消费的数据,右边为
tail -f
实时显示日志文件中新增数据的效果,可以看到数据几乎是同步的。后面可通过Spark Streaming来的实时处理Kafka中的数据。