一、下载安装包并解压
下载地址:http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.15.1.tar.gz
解压:
[hadoop@hadoop000 software]$ tar -zxvf flume-ng-1.6.0-cdh5.15.1.tar.gz -C ../app/
[hadoop@hadoop000 app]$ ln -s apache-flume-1.6.0-cdh5.15.1-bin flume
二、配置环境变量
[hadoop@hadoop000 ~]$ vi .bash_profile
export FLUME_HOME=/home/hadoop/app/flume
export PATH=${FLUME_HOME}/bin:${PATH}
[hadoop@hadoop000 ~]$ source .bash_profile
三、 配置flume-env.sh文件的JavaHome
[hadoop@hadoop000 conf]$ cp flume-env.sh.template flume-env.sh
[hadoop@hadoop000 conf]$ vi flume-env.sh
export JAVA_HOME=/usr/java/jdk1.8.0_45
四、使用案例
- 需求:监控一个文件夹下的数据输出到hdfs并且分区
组件名称 | 选型 |
---|---|
Source | taildir |
Channel | memory |
Sink | HDFS/logger(测试) |
- 脚本(测试)
[hadoop@hadoop000 flume]$ cat taildir-memory-logger.conf
#define agent
taildir-hdfs-agent.sources=taildir-source
taildir-hdfs-agent.channels=taildir-memory-channel
taildir-hdfs-agent.sinks=hdfs-sink
#define source
taildir-hdfs-agent.sources.taildir-source.type=TAILDIR
taildir-hdfs-agent.sources.taildir-source.filegroups=f1 f2
taildir-hdfs-agent.sources.taildir-source.filegroups.f1=/home/hadoop/data/taildir/test1/example.log
taildir-hdfs-agent.sources.taildir-source.filegroups.f2=/home/hadoop/data/taildir/test2/.*log.*
taildir-hdfs-agent.sources.taildir-source.positionFile=/home/hadoop/position/taildir_position.json
#define channel
taildir-hdfs-agent.channels.taildir-memory-channel.type=memory
#define sink
taildir-hdfs-agent.sinks.hdfs-sink.type=logger
#bind source and sink to channel
taildir-hdfs-agent.sources.taildir-source.channels=taildir-memory-channel
taildir-hdfs-agent.sinks.hdfs-sink.channel=taildir-memory-channel
- 启动命令
[hadoop@hadoop000 flume]$ flume-ng \
agent \
-n taildir-hdfs-agent \
-f /home/hadoop/script/flume/taildir-memory-logger.conf \
-c ${FLUME_HOME}/conf \
-Dflume.root.logger=INFO,console
特别注意
其中最重要的就是 --name 或者 -n ,它指定了启动agent的名称,注意是启动agent的名称。
这个名称必须与配置文件中的一样
这个名称必须与配置文件中的一样
这个名称必须与配置文件中的一样
- 脚本(hdfs)
[hadoop@hadoop000 flume]$ cat taildir-memory-hdfs.conf
#define agent
taildir-hdfs-agent.sources=taildir-source
taildir-hdfs-agent.channels=taildir-memory-channel
taildir-hdfs-agent.sinks=hdfs-sink
#define source
taildir-hdfs-agent.sources.taildir-source.type=TAILDIR
taildir-hdfs-agent.sources.taildir-source.filegroups=f1 f2
taildir-hdfs-agent.sources.taildir-source.filegroups.f1=/home/hadoop/data/taildir/test1/example.log
taildir-hdfs-agent.sources.taildir-source.filegroups.f2=/home/hadoop/data/taildir/test2/.*log.*
taildir-hdfs-agent.sources.taildir-source.positionFile=/home/hadoop/position/taildir_position.json
#define channel
taildir-hdfs-agent.channels.taildir-memory-channel.type=memory
#define sink
taildir-hdfs-agent.sinks.hdfs-sink.type=hdfs
taildir-hdfs-agent.sinks.hdfs-sink.hdfs.path=hdfs://hadoop000:9000/flume/events/%y-%m-%d-%H-%M
taildir-hdfs-agent.sinks.hdfs-sink.hdfs.filePrefix=events-
#隔多少秒滚动当前文件
taildir-hdfs-agent.sinks.hdfs-sink.hdfs.rollInterval=120
#隔多少bytes滚动当前文件(生成上一般设置为127、126M,因为如果设置128M可能会导致一条events在两个文件中)
taildir-hdfs-agent.sinks.hdfs-sink.hdfs.rollSize=1024
#隔多少个events滚动当前文件
taildir-hdfs-agent.sinks.hdfs-sink.hdfs.rollCount=10
#以上三个满足只要一个就滚动当前文件(当前正在写的文件后缀为.tmp,所以最好设置hdfs.rollInterval参数,不然没有达到另外两个参数,同时又跳到下一个分区去了,那么这个文件就一直是.tmp后缀)
#不会压缩输出文件
taildir-hdfs-agent.sinks.hdfs-sink.hdfs.fileType=DataStream
#使用本地时间(如果根据时间分区,必须为true)
taildir-hdfs-agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp=true
#是否滚动分区(如果这个分区内没有数据,则不会产生这个分区)
taildir-hdfs-agent.sinks.hdfs-sink.hdfs.round=true
#滚动分区的值
taildir-hdfs-agent.sinks.hdfs-sink.hdfs.roundValue=2
#滚动分区的单位
taildir-hdfs-agent.sinks.hdfs-sink.hdfs.roundUnit=minute
#bind source and sink to channel
taildir-hdfs-agent.sources.taildir-source.channels=taildir-memory-channel
taildir-hdfs-agent.sinks.hdfs-sink.channel=taildir-memory-channel
- 启动命令
[hadoop@hadoop000 flume]$ flume-ng \
agent \
-n taildir-hdfs-agent \
-f /home/hadoop/script/flume/taildir-memory-hdfs.conf \
-c ${FLUME_HOME}/conf \
-Dflume.root.logger=INFO,console
五、归并案例
Source | Channel | Sink |
---|---|---|
nc | memory | avro |
taildir | memory | avro |
avro | memory | logger |
- nc-memory-avro
[hadoop@hadoop000 multi-agent]$ vi nc-memory-avro.conf
#define agent
nc-avro-agent.sources=nc-source
nc-avro-agent.channels=nc-memory-channel
nc-avro-agent.sinks=avro-sink
#define source
nc-avro-agent.sources.nc-source.type=netcat
nc-avro-agent.sources.nc-source.bind=0.0.0.0
nc-avro-agent.sources.nc-source.port=44444
#define channel
nc-avro-agent.channels.nc-memory-channel.type=memory
#define sink
nc-avro-agent.sinks.avro-sink.type=avro
nc-avro-agent.sinks.avro-sink.hostname=hadoop000
nc-avro-agent.sinks.avro-sink.port=44445
#bind source and sink to channel
nc-avro-agent.sources.nc-source.channels=nc-memory-channel
nc-avro-agent.sinks.avro-sink.channel=nc-memory-channel
- taildir-memory-avro
[hadoop@hadoop000 multi-agent]$ vi taildir-memory-avro.conf
#define agent
taildir-avro-agent.sources=taildir-source
taildir-avro-agent.channels=taildir-memory-channel
taildir-avro-agent.sinks=avro-sink
#define source
taildir-avro-agent.sources.taildir-source.type=TAILDIR
taildir-avro-agent.sources.taildir-source.filegroups=f1 f2
taildir-avro-agent.sources.taildir-source.filegroups.f1=/home/hadoop/data/taildir/test1/example.log
taildir-avro-agent.sources.taildir-source.filegroups.f2=/home/hadoop/data/taildir/test2/.*log.*
taildir-avro-agent.sources.taildir-source.positionFile=/home/hadoop/position/taildir_position.json
#define channel
taildir-avro-agent.channels.taildir-memory-channel.type=memory
#define sink
taildir-avro-agent.sinks.avro-sink.type=avro
taildir-avro-agent.sinks.avro-sink.hostname=hadoop000
taildir-avro-agent.sinks.avro-sink.port=44445
#bind source and sink to channel
taildir-avro-agent.sources.taildir-source.channels=taildir-memory-channel
taildir-avro-agent.sinks.avro-sink.channel=taildir-memory-channel
- avro-memory-logger
[hadoop@hadoop000 multi-agent]$ vi avro-memory-logger.conf
#define agent
avro-logger-agent.sources=avro-source
avro-logger-agent.channels=avro-memory-channel
avro-logger-agent.sinks=logger-sink
#define source
avro-logger-agent.sources.avro-source.type=avro
avro-logger-agent.sources.avro-source.bind=hadoop000
avro-logger-agent.sources.avro-source.port=44445
#define channel
avro-logger-agent.channels.avro-memory-channel.type=memory
#define sink
avro-logger-agent.sinks.logger-sink.type=logger
#bind source and sink to channel
avro-logger-agent.sources.avro-source.channels=avro-memory-channel
avro-logger-agent.sinks.logger-sink.channel=avro-memory-channel
- 启动命令
flume-ng agent -n avro-logger-agent -f /home/hadoop/script/flume/multi-agent/avro-memory-logger.conf -c ${FLUME_HOME}/conf -Dflume.root.logger=INFO,console
flume-ng agent -n nc-avro-agent -f /home/hadoop/script/flume/multi-agent/nc-memory-avro.conf -c ${FLUME_HOME}/conf -Dflume.root.logger=INFO,console
flume-ng agent -n taildir-avro-agent -f /home/hadoop/script/flume/multi-agent/taildir-memory-avro.conf -c ${FLUME_HOME}/conf -Dflume.root.logger=INFO,console