默认命名如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1 c2 c3 c4
agent_name:a1 source_name:r1 channel_name:c1 sink_name:k1
1、Interceptors(sources)
拦截器的作用范围是数据源到source之间,主要是为了给数据添加headers,最常用的是timestamp、host、static。timestamp类型可以配合hdfs sink的文件输出的日期格式(hdfs sink也可以用hdfs.useLocalTimeStamp来当时间戳,但是不能代表数据的产生时间)。可以用host和static来做数据来源界定,既可以用来标记数据,也可以用来做数据分发的时候使用(配合Channel Selectors)。但是默认的Interceptor只能针对web打标签,不能针对event打标签。如果需要对数据进行标志,需要修改代码才能实现,比如avro source可以修改org.apache.flume.clients.log4jappender.Log4jAppender的实现来增加一些数据上的处理。
2、Channel Selectors
选择器的作用范围是source到channel之间,主要是为了确定每个Event到哪个channel。最常用的类型是replicating和multiplexing,replicating是默认的,能够把Event分别复制到每个channel。multiplexing可以根据Event的headers里的key的不同的值把数据分发到不同的channel。
3、Sink Processors
处理器的作用范围是channel到sink之间,主要是为了确定Events到哪个sink。默认的是default,最常用的是failover(故障转移)和load_balance(负载均衡)。
4、Flume sources
avro(监听端口):
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444
a1.sources.r1.ipFilter = true #开启ip黑白名单认证,多个验证顺序执行,或关系,只要满足一个即验证结束
a1.sources.r1.ipFilterRules = allow:ip:127.*,allow:name:localhost,deny:ip:*#这个意思是阻止任何ip访问,仅允许ip127开头和localhost通过
#拦截器部分,其他类型source组件一样
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i3.type = static
a1.sources.r1.interceptors.i3.key = test_key
a1.sources.r1.interceptors.i3.value = test_value
#选择器部分,其他类型source组件一样
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header=state
a1.sources.r1.selector.mapping.CZ=c1
a1.sources.r1.selector.mapping.US=c2 c3
a1.sources.r1.selector.default=c4
exec(常用量监控文件):
a1.sources.r1.type=exec
a1.sources.r1.command=tail -F /var/log/secure
a1.sources.r1.batchSize=20 #批处理条数,每次上传数据的条数,不到不传
a1.sources.r1.batchTimeout=3000 #等待数据时间(单位毫秒),如果在时间内没有收集到20条数据也需要传输数据
spooldir(监控目录):
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/app/flume-1.6.0-cdh5.14.0-bin/tmp/
a1.sources.r1.fileSuffix = .org
a1.sources.r1.deletePolicy = never #读完删除与否,默认不删除(never),immediate立即删除
a1.sources.r1.fileHeader = true
a1.sources.r1.fileHeaderKey = ruoze_file #headers里添加ruoze_file=文件全路径
a1.sources.r1.basenameHeader = true
a1.sources.r1.basenameHeaderKey = ruoze_base #headers里添加ruoze_base=文件名
a1.sources.r1.trackerDir = /home/hadoop/app/flume-1.6.0-cdh5.14.0-bin/flumespool
TAILDIR(多目录的文件监控):
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.positionFile = /home/hadoop/app/flume-1.6.0-cdh5.14.0-bin/positions/taildir_position.json
a1.sources.r1.filegroups.f1 = /home/hadoop/app/flume-1.6.0-cdh5.14.0-bin/tmp
a1.sources.r1.headers.f1.ruoze = TAILDIR_f1
a1.sources.r1.byteOffsetHeader = true #字节偏移量放到headers
a1.sources.r1.writePosInterval = 3000 #偏移量记录时间间隔
a1.sources.r1.filegroups.f2 = /home/hadoop/app/flume-1.6.0-cdh5.14.0-bin/flumespool
a1.sources.r1.headers.f2.ruoze2 = TAILDIR_f2
netcat(监听端口):
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444
a1.sources.r1.max-line-length = 1024 #一行的最大长度,超出长度会报错并断掉连接
a1.sources.r1.ack-every-event = true #应答OK给每个Event
5、Flume channels
memory(内存管道):
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000#缓存最大Event条数
a1.channels.c1.transactionCapacity = 100#事物包括的Event最大条数,source和sink的batchSize需要小于该值
file(文件管道):
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = #监测点文件路径
a1.channels.c1.useDualCheckpoints = true #监测点文件是否备份
a1.channels.c1.backupCheckpointDir = #监测点文件备份路径
a1.channels.c1.dataDirs = #数据存储目录
6、Flume sinks
logger(控制台打印信息):
a1.sinks.k1.type = logger
flume-ng启动时需要加上:-Dflume.root.logger=INFO,console
avro(往avro source推送数据):
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost #目标ip
a1.sinks.k1.port = 44444 #目标端口
HDFS(写数据到HDFS):
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.205.131:9000/data/%Y%m%d%H%M #这个时间的格式需要时间戳来进行转换,需要headers里有timestamp
a1.sinks.k1.hdfs.filePrefix = app_name #文件前缀
a1.sinks.k1.hdfs.fileSuffix = .log #文件后缀
a1.sinks.k1.hdfs.inUseSuffix = .tmp #正在写入文件后缀
a1.sinks.k1.hdfs.rollInterval = 30 #临时文件转正式文件的时间,单位秒,如果为0,则不限制,和rollSize、rollCount为或关系
a1.sinks.k1.hdfs.rollSize = 10485760 #临时文件转正式文件的大小,单位byte,如果为0,则不限制
a1.sinks.k1.hdfs.rollCount = 100000 #临时文件转正式文件记录条数,如果为0,则不限制
a1.sinks.k1.hdfs.round = true #是否启用时间上的”舍弃”。如果启用,则会影响除了%t的其他所有时间表达式
a1.sinks.k1.hdfs.roundValue = 10#时间上进行“舍弃”的值,这里是舍弃10分钟以内的值,意味着每10分钟新建一个目录
a1.sinks.k1.hdfs.roundUnit = minute #时间上进行”舍弃”的单位,包含:second,minute,hour
a1.sinks.k1.hdfs.useLocalTimeStamp = true #如果source里没有timestamp时间戳,则需要该参数为true