1、背景
- 将data路径下所有日志文件通过Flume采集到HDFS上
- 五分钟一个目录,一分钟形成一个文件
2、技术选型
flume中有三种可监控文件或目录的source,分别问exec、spooldir、taildir
exec:可通过tail -f命令去tail住一个文件,然后实时同步日志到sink
spooldir:可监听一个目录,同步目录中的新文件到sink,被同步完的文件可被立即删除或被打上标记。适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步。
taildir:可实时监控一批文件,并记录每个文件最新消费位置,agent进程重启后不会有重复消费的问题。
故本次选择 taildir - file - HDFS
3、配置agent
vi taildir-file-hdfs.conf
#agent_name
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#source的配置
# source类型
a1.sources.r1.type = TAILDIR
# 元数据位置
a1.sources.r1.positionFile = /home/hadoop/data/bd/taildir_position.json
# 监控的目录
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1=/home/hadoop/data/bd/.*log
a1.sources.r1.fileHeader = true
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
#sink的配置
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop001:9000/offline/%Y%m%d/%H%M
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.filePrefix = bd
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.rollSize =67108864
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 5
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.minBlockReplicas = 1
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.fileType=DataStream
#channel的配置
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/hadoop/data/checkpoint
a1.channels.c1.dataDirs = /home/hadoop/data
a1.channels.c1.capacity = 10000000
a1.channels.c1.transactionCapacity = 5000
#用channel链接source和sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel =c1
4、启动flume
./flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/taildir-file-hdfs.conf \
-Dflume.root.logger=INFO,console
5、模拟业务数据
- 编写shell脚本
vi 1.sh
#!/bin/bash
cat /home/hadoop/data/bd/1.log >> /home/hadoop/data/bd/bd.log
cat /home/hadoop/data/bd/2.log >> /home/hadoop/data/bd/bd.log
cat /home/hadoop/data/bd.log >> /home/hadoop/data/bd/bd.log
cat /home/hadoop/data/bd.log >> /home/hadoop/data/bd/bd1.log
cat /home/hadoop/data/bd/1.log >> /home/hadoop/data/bd/bd1.log
cat /home/hadoop/data/bd/2.log >> /home/hadoop/data/bd/bd1.log
cat /home/hadoop/data/bd/1.log >> /home/hadoop/data/bd/bd2.log
cat /home/hadoop/data/bd/2.log >> /home/hadoop/data/bd/bd2.log
- 编辑crontab,添加每分钟执行1.sh
[hadoop@hadoop001 data]$ chmod +x 1.sh
[hadoop@hadoop001 data]$ crontab -e
* * * * * sh /home/hadoop/data/1.sh