Flume学习一(基本source、channel、sink)

默认命名如下:

a1.sources = r1

a1.sinks = k1

a1.channels = c1 c2 c3 c4

agent_name:a1     source_name:r1     channel_name:c1     sink_name:k1

1、Interceptors(sources)

拦截器的作用范围是数据源到source之间,主要是为了给数据添加headers,最常用的是timestamp、host、static。timestamp类型可以配合hdfs sink的文件输出的日期格式(hdfs sink也可以用hdfs.useLocalTimeStamp来当时间戳,但是不能代表数据的产生时间)。可以用host和static来做数据来源界定,既可以用来标记数据,也可以用来做数据分发的时候使用(配合Channel Selectors)。但是默认的Interceptor只能针对web打标签,不能针对event打标签。如果需要对数据进行标志,需要修改代码才能实现,比如avro  source可以修改org.apache.flume.clients.log4jappender.Log4jAppender的实现来增加一些数据上的处理。

2、Channel Selectors

选择器的作用范围是source到channel之间,主要是为了确定每个Event到哪个channel。最常用的类型是replicating和multiplexing,replicating是默认的,能够把Event分别复制到每个channel。multiplexing可以根据Event的headers里的key的不同的值把数据分发到不同的channel。

3、Sink Processors

处理器的作用范围是channel到sink之间,主要是为了确定Events到哪个sink。默认的是default,最常用的是failover(故障转移)和load_balance(负载均衡)。

4、Flume sources

avro(监听端口):

a1.sources.r1.type = avro

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 44444

a1.sources.r1.ipFilter = true #开启ip黑白名单认证,多个验证顺序执行,或关系,只要满足一个即验证结束

a1.sources.r1.ipFilterRules = allow:ip:127.*,allow:name:localhost,deny:ip:*#这个意思是阻止任何ip访问,仅允许ip127开头和localhost通过

#拦截器部分,其他类型source组件一样

a1.sources.r1.interceptors.i1.type = timestamp

a1.sources.r1.interceptors.i2.type = host

a1.sources.r1.interceptors.i3.type = static

a1.sources.r1.interceptors.i3.key = test_key

a1.sources.r1.interceptors.i3.value = test_value

#选择器部分,其他类型source组件一样

a1.sources.r1.selector.type=multiplexing

a1.sources.r1.selector.header=state

a1.sources.r1.selector.mapping.CZ=c1

a1.sources.r1.selector.mapping.US=c2 c3

a1.sources.r1.selector.default=c4

exec(常用量监控文件):

a1.sources.r1.type=exec

a1.sources.r1.command=tail -F /var/log/secure

a1.sources.r1.batchSize=20 #批处理条数,每次上传数据的条数,不到不传

a1.sources.r1.batchTimeout=3000 #等待数据时间(单位毫秒),如果在时间内没有收集到20条数据也需要传输数据

spooldir(监控目录):

a1.sources.r1.type = spooldir

a1.sources.r1.spoolDir = /home/hadoop/app/flume-1.6.0-cdh5.14.0-bin/tmp/

a1.sources.r1.fileSuffix = .org

a1.sources.r1.deletePolicy = never #读完删除与否,默认不删除(never),immediate立即删除

a1.sources.r1.fileHeader = true

a1.sources.r1.fileHeaderKey = ruoze_file #headers里添加ruoze_file=文件全路径

a1.sources.r1.basenameHeader = true

a1.sources.r1.basenameHeaderKey = ruoze_base #headers里添加ruoze_base=文件名

a1.sources.r1.trackerDir = /home/hadoop/app/flume-1.6.0-cdh5.14.0-bin/flumespool

TAILDIR(多目录的文件监控):

a1.sources.r1.type = TAILDIR

a1.sources.r1.filegroups = f1 f2

a1.sources.r1.positionFile = /home/hadoop/app/flume-1.6.0-cdh5.14.0-bin/positions/taildir_position.json

a1.sources.r1.filegroups.f1 = /home/hadoop/app/flume-1.6.0-cdh5.14.0-bin/tmp

a1.sources.r1.headers.f1.ruoze = TAILDIR_f1

a1.sources.r1.byteOffsetHeader = true #字节偏移量放到headers

a1.sources.r1.writePosInterval = 3000 #偏移量记录时间间隔

a1.sources.r1.filegroups.f2 = /home/hadoop/app/flume-1.6.0-cdh5.14.0-bin/flumespool

a1.sources.r1.headers.f2.ruoze2 = TAILDIR_f2

netcat(监听端口):

a1.sources.r1.type = netcat

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 44444

a1.sources.r1.max-line-length = 1024 #一行的最大长度,超出长度会报错并断掉连接

a1.sources.r1.ack-every-event = true #应答OK给每个Event


5、Flume channels

memory(内存管道):

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000#缓存最大Event条数

a1.channels.c1.transactionCapacity = 100#事物包括的Event最大条数,source和sink的batchSize需要小于该值

file(文件管道):

a1.channels.c1.type = file

a1.channels.c1.checkpointDir = #监测点文件路径

a1.channels.c1.useDualCheckpoints = true #监测点文件是否备份

a1.channels.c1.backupCheckpointDir = #监测点文件备份路径

a1.channels.c1.dataDirs = #数据存储目录

6、Flume  sinks

logger(控制台打印信息):

a1.sinks.k1.type = logger

flume-ng启动时需要加上:-Dflume.root.logger=INFO,console

avro(往avro  source推送数据):

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = localhost #目标ip

a1.sinks.k1.port = 44444 #目标端口

HDFS(写数据到HDFS):

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://192.168.205.131:9000/data/%Y%m%d%H%M  #这个时间的格式需要时间戳来进行转换,需要headers里有timestamp

a1.sinks.k1.hdfs.filePrefix = app_name #文件前缀

a1.sinks.k1.hdfs.fileSuffix = .log #文件后缀

a1.sinks.k1.hdfs.inUseSuffix = .tmp #正在写入文件后缀

a1.sinks.k1.hdfs.rollInterval = 30 #临时文件转正式文件的时间,单位秒,如果为0,则不限制,和rollSize、rollCount为或关系

a1.sinks.k1.hdfs.rollSize = 10485760 #临时文件转正式文件的大小,单位byte,如果为0,则不限制

a1.sinks.k1.hdfs.rollCount = 100000 #临时文件转正式文件记录条数,如果为0,则不限制

a1.sinks.k1.hdfs.round = true #是否启用时间上的”舍弃”。如果启用,则会影响除了%t的其他所有时间表达式

a1.sinks.k1.hdfs.roundValue = 10#时间上进行“舍弃”的值,这里是舍弃10分钟以内的值,意味着每10分钟新建一个目录

a1.sinks.k1.hdfs.roundUnit = minute #时间上进行”舍弃”的单位,包含:second,minute,hour

a1.sinks.k1.hdfs.useLocalTimeStamp = true #如果source里没有timestamp时间戳,则需要该参数为true

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容