flume 是一个数据搜集框架,负责采集各个服务器上的日志文件,并上传到 hdfs、kafka、其他服务器等。
flume由三部分组成:
- source:用来搜集数据
- channel: 用来暂时存放数据(可选择存到 内存 或文件 中 等)
- sink: 将数据写入到某种介质中(hdfs、oracle等)
source
1. avro source
flume 通过avro方式在两台机器之间进行数据传输。
比如,192.168.17.18 上的数据 传到 192.168.17.17,
首先要再两台机器上都部署 flume,17 下的配置文件:
a2.sources.r1.type = avro
a2.sources.r1.channels = c1
a2.sources.r1.bind = 192.168.17.17
a2.sources.r1.port = 1234
在17上启动 flume:
flume-ng agent --conf ./conf/ -f conf/avro-flume.conf -Dflume.root.logger=DEBUG,console -n a2
在18上运行:
flume-ng avro-client -c ./conf -H 192.168.17.17 -p 1234 -F logs/test_data.log
这样 18 上的logs/test_data.log 的数据直接在 17上打印出来了,即 17 的控制台可以看到 18 的内容
2. exec source
exec 在启动时运行给定的Unix命令。
tail -f filename
说明:监视filename文件的尾部内容(默认10行)
对文件实时监控,如果发现文件有新日志,立刻收集并发送。
缺点:当你的服务器宕机重启后,此时数据读取还是从头开始。
agent.sources.s1.type=exec
agent.sources.s1.command=tail -f /tmp/logs/kafka.log
3. spooldir source
对指定目录进行实时监控,如发现目录新增文件,立刻收集并发送
缺点:不能对目录文件进行修改,如果有追加内容的文本文件,不允许
a1.sources.s1.type = spooldir
a1.sources.s1.spoolDir = /opt/data/flume/log
4. taildir source
可以监控一个目录下的多个文件,可以实现 实时读取文件。
实现了 ExecSource + SpoolDirectorySource 的功能。
taildirSource 对于log4j 日志有BUG:log4j 日志会自动切分,log4j 切分日志其实就是新建一个文件,然后把原来的日志文件都改名。但是 taildirSource 组件不支持文件改名。如果文件改名会认为是新文件,就会重新读取,这就导致了日志文件重读。
可以通过修改源码解决,网上有教程。
channel
1. Memory Channel
Memory Channel把Event保存在内存队列中,有最好的性能,不过也有数据可能会丢失的风险,如果Flume崩溃或者重启,那么保存在Channel中的Event都会丢失
#具体定义channel,容纳1000条数据,100条发送一次
a4.channels.c1.type = memory
a4.channels.c1.capacity = 10000
a4.channels.c1.transactionCapacity = 100
2. File Channel
File Channel把Event保存在本地硬盘中,比Memory Channel提供更好的可靠性和可恢复性,不过要操作本地文件,性能要差一些。
配置项 | 默认值 | 说明 |
---|---|---|
type | 值为file | |
dataDir | 保存路径 |
File Channel参考配置,a1为Agent实例名称。
a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data
Kafka Channel
Kafka Channel把Event保存在Kafka集群中,能提供比File Channel更好的性能和比Memory Channel更高的可靠性。
配置项 | 默认值 | 说明 |
---|---|---|
type | org.apache.flume.channel.kafka.KafkaChannel | |
brokerList | Kafka集群Broker列表 | |
zookeeperConnect | Kafka集群的ZooKeeper路径 |
Kafka Channel参考配置,a1为Agent实例名称。
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.capacity = 10000
a1.channels.channel1.transactionCapacity = 1000
a1.channels.channel1.brokerList=kafka-2:9092, kafka-3:9092
a1.channels.channel1.topic=channel1
a1.channels.channel1.zookeeperConnect=kafka-1:2181
Spillable Memory Channel
Spillable Memory Channel把Event保存到内存队列和本地文件中,数据优先存储在内存队列中,当内存队列满了以后会保存到文件中。Spillable Memory Channel在保持较高性能的同时,又能兼顾可靠性。
配置项 | 默认值 | 说明 |
---|---|---|
type | SPILLABLEMEMORY | |
memoryCapacity | 10000 | 内存队列中数据最大数量 |
overflowCapacity | 100000000 | 文件中数据最大数量 |
参考配置如下,a1为Agent实例名称。
a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 10000
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.byteCapacity = 800000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data
sink
1. HDFS sink
接收器将事件写入Hadoop分布式文件系统(HDFS)
#具体定义sink,filePrefix:数据前缀。 hdfs://ns1/flume/%Y%m%d:指定path,%Y%m%d 指按照时间生成
a4.sinks.k1.type = hdfs
a4.sinks.k1.hdfs.path = hdfs://ns1/flume/%Y%m%d
a4.sinks.k1.hdfs.filePrefix = events-
# 使用文本文件,不使用sequenceFile
a4.sinks.k1.hdfs.fileType = DataStream
#不按照条数生成文件,rollCount:多少条数据产生一个文件
a4.sinks.k1.hdfs.rollCount = 0
#HDFS上的文件达到128M时生成一个文件,即文件已经写了多次次之后,写新文件
a4.sinks.k1.hdfs.rollSize = 134217728
2. hive sink
a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
a1.sinks.k1.hive.database = logsdb
a1.sinks.k1.hive.table = weblogs
a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M
3. hbase sink
4. avro sink
avro sink形成了Flume分层收集支持的一半。 发送到此接收器的Flume事件将转换为Avro事件并发送到配置的 主机名/端口对。
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = rainbow.com.cn
a1.sinks.k1.port = 4545
5. kafka sink
这是一个Flume Sink实现,可以将数据发布到Kafka主题。 其中一个目标是将Flume与Kafka集成
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = rainbow.com.cn:9092
a1.sinks.k1.topic = testTopic
可以在 .conf 文件中配置多个 source、channel、sink
1.多sink
channel 的内容只输出一次,同一个event 如果sink1 输出,sink2 不输出;如果sink1 输出,sink1 不输出。 最终 sink1+sink2=channel 中的数据。
配置文件如下:
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.shell = /bin/bash -c
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/apps/logs/tail4.log
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#sink1
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy
#sink2
a1.sinks.k2.type = file_roll
a1.sinks.k2.channel = c1
#a1.sinks.k2.sink.rollInterval=0
a1.sinks.k2.sink.directory = /opt/apps/tmp
2.多 channel 多sink ,每个sink 输出内容一致
(memory channel 用于kafka操作,实时性高,file channel 用于 sink file 数据安全性高)
(多channel 单 sink 的情况没有举例,个人感觉用处不广泛。)
配置文件如下:
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.shell = /bin/bash -c
a1.sources.r1.channels = c1 c2
a1.sources.r1.command = tail -F /opt/apps/logs/tail4.log
#多个channel 的数据相同
a1.sources.r1.selector.type=replicating
# channel1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#channel2
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /opt/apps/flume-1.7.0/checkpoint
a1.channels.c2.dataDirs = /opt/apps/flume-1.7.0/data
#sink1
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy
#sink2
a1.sinks.k2.type = file_roll
a1.sinks.k2.channel = c2
#a1.sinks.k2.sink.rollInterval=0
a1.sinks.k2.sink.directory = /opt/apps/tmp
3. 多source 单 channel 单 sink
多个source 可以读取多种信息放在一个channel 然后输出到同一个地方
配置文件如下:
a1.sources = r1 r2
a1.sinks = k1
a1.channels = c1
# source1
a1.sources.r1.type = exec
a1.sources.r1.shell = /bin/bash -c
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/apps/logs/tail4.log
# source2
a1.sources.r2.type = exec
a1.sources.r2.shell = /bin/bash -c
a1.sources.r2.channels = c1
a1.sources.r2.command = tail -F /opt/apps/logs/tail2.log
# channel1 in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#sink1
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy