Flume

flume 是一个数据搜集框架,负责采集各个服务器上的日志文件,并上传到 hdfs、kafka、其他服务器等。

flume由三部分组成:

  • source:用来搜集数据
  • channel: 用来暂时存放数据(可选择存到 内存 或文件 中 等)
  • sink: 将数据写入到某种介质中(hdfs、oracle等)

source

1. avro source

flume 通过avro方式在两台机器之间进行数据传输。

比如,192.168.17.18 上的数据 传到 192.168.17.17,
首先要再两台机器上都部署 flume,17 下的配置文件:

a2.sources.r1.type = avro  
a2.sources.r1.channels = c1  
a2.sources.r1.bind = 192.168.17.17  
a2.sources.r1.port = 1234  

在17上启动 flume:

flume-ng agent --conf ./conf/ -f conf/avro-flume.conf -Dflume.root.logger=DEBUG,console -n a2

在18上运行:

flume-ng avro-client -c ./conf -H 192.168.17.17 -p 1234 -F logs/test_data.log

这样 18 上的logs/test_data.log 的数据直接在 17上打印出来了,即 17 的控制台可以看到 18 的内容

2. exec source

exec 在启动时运行给定的Unix命令。

tail -f filename
说明:监视filename文件的尾部内容(默认10行)

对文件实时监控,如果发现文件有新日志,立刻收集并发送。
缺点:当你的服务器宕机重启后,此时数据读取还是从头开始。

agent.sources.s1.type=exec            
agent.sources.s1.command=tail -f /tmp/logs/kafka.log
3. spooldir source

对指定目录进行实时监控,如发现目录新增文件,立刻收集并发送

缺点:不能对目录文件进行修改,如果有追加内容的文本文件,不允许

a1.sources.s1.type = spooldir    
a1.sources.s1.spoolDir = /opt/data/flume/log
4. taildir source

可以监控一个目录下的多个文件,可以实现 实时读取文件。
实现了 ExecSource + SpoolDirectorySource 的功能。

taildirSource 对于log4j 日志有BUG:log4j 日志会自动切分,log4j 切分日志其实就是新建一个文件,然后把原来的日志文件都改名。但是 taildirSource 组件不支持文件改名。如果文件改名会认为是新文件,就会重新读取,这就导致了日志文件重读。
可以通过修改源码解决,网上有教程。

channel

1. Memory Channel

Memory Channel把Event保存在内存队列中,有最好的性能,不过也有数据可能会丢失的风险,如果Flume崩溃或者重启,那么保存在Channel中的Event都会丢失

#具体定义channel,容纳1000条数据,100条发送一次
a4.channels.c1.type = memory
a4.channels.c1.capacity = 10000
a4.channels.c1.transactionCapacity = 100
2. File Channel

File Channel把Event保存在本地硬盘中,比Memory Channel提供更好的可靠性和可恢复性,不过要操作本地文件,性能要差一些。

配置项 默认值 说明
type 值为file
dataDir 保存路径

File Channel参考配置,a1为Agent实例名称。

a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

Kafka Channel

Kafka Channel把Event保存在Kafka集群中,能提供比File Channel更好的性能和比Memory Channel更高的可靠性。

配置项 默认值 说明
type org.apache.flume.channel.kafka.KafkaChannel
brokerList Kafka集群Broker列表
zookeeperConnect Kafka集群的ZooKeeper路径

Kafka Channel参考配置,a1为Agent实例名称。

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.capacity = 10000
a1.channels.channel1.transactionCapacity = 1000
a1.channels.channel1.brokerList=kafka-2:9092, kafka-3:9092
a1.channels.channel1.topic=channel1
a1.channels.channel1.zookeeperConnect=kafka-1:2181

Spillable Memory Channel

Spillable Memory Channel把Event保存到内存队列和本地文件中,数据优先存储在内存队列中,当内存队列满了以后会保存到文件中。Spillable Memory Channel在保持较高性能的同时,又能兼顾可靠性。

配置项 默认值 说明
type SPILLABLEMEMORY
memoryCapacity 10000 内存队列中数据最大数量
overflowCapacity 100000000 文件中数据最大数量

参考配置如下,a1为Agent实例名称。

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 10000
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.byteCapacity = 800000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

sink

1. HDFS sink

接收器将事件写入Hadoop分布式文件系统(HDFS)

#具体定义sink,filePrefix:数据前缀。 hdfs://ns1/flume/%Y%m%d:指定path,%Y%m%d 指按照时间生成
a4.sinks.k1.type = hdfs
a4.sinks.k1.hdfs.path = hdfs://ns1/flume/%Y%m%d
a4.sinks.k1.hdfs.filePrefix = events-
# 使用文本文件,不使用sequenceFile  
a4.sinks.k1.hdfs.fileType = DataStream

#不按照条数生成文件,rollCount:多少条数据产生一个文件
a4.sinks.k1.hdfs.rollCount = 0

#HDFS上的文件达到128M时生成一个文件,即文件已经写了多次次之后,写新文件
a4.sinks.k1.hdfs.rollSize = 134217728
2. hive sink
a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
a1.sinks.k1.hive.database = logsdb
a1.sinks.k1.hive.table = weblogs
a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M
3. hbase sink
4. avro sink

avro sink形成了Flume分层收集支持的一半。 发送到此接收器的Flume事件将转换为Avro事件并发送到配置的 主机名/端口对。

a1.sinks.k1.type = avro  
a1.sinks.k1.channel = c1  
a1.sinks.k1.hostname = rainbow.com.cn  
a1.sinks.k1.port = 4545

5. kafka sink

这是一个Flume Sink实现,可以将数据发布到Kafka主题。 其中一个目标是将Flume与Kafka集成

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink  
a1.sinks.k1.brokerList = rainbow.com.cn:9092  
a1.sinks.k1.topic = testTopic

可以在 .conf 文件中配置多个 source、channel、sink

1.多sink

channel 的内容只输出一次,同一个event 如果sink1 输出,sink2 不输出;如果sink1 输出,sink1 不输出。 最终 sink1+sink2=channel 中的数据。

配置文件如下:

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.shell = /bin/bash -c
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/apps/logs/tail4.log

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#sink1
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy

#sink2
a1.sinks.k2.type = file_roll
a1.sinks.k2.channel = c1
#a1.sinks.k2.sink.rollInterval=0
a1.sinks.k2.sink.directory = /opt/apps/tmp

2.多 channel 多sink ,每个sink 输出内容一致

(memory channel 用于kafka操作,实时性高,file channel 用于 sink file 数据安全性高)
(多channel 单 sink 的情况没有举例,个人感觉用处不广泛。)

配置文件如下:

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.shell = /bin/bash -c
a1.sources.r1.channels = c1 c2
a1.sources.r1.command = tail -F /opt/apps/logs/tail4.log
#多个channel 的数据相同
a1.sources.r1.selector.type=replicating

# channel1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#channel2
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /opt/apps/flume-1.7.0/checkpoint
a1.channels.c2.dataDirs = /opt/apps/flume-1.7.0/data

#sink1
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy

#sink2
a1.sinks.k2.type = file_roll
a1.sinks.k2.channel = c2
#a1.sinks.k2.sink.rollInterval=0
a1.sinks.k2.sink.directory = /opt/apps/tmp

3. 多source 单 channel 单 sink

多个source 可以读取多种信息放在一个channel 然后输出到同一个地方
配置文件如下:

a1.sources = r1 r2
a1.sinks = k1
a1.channels = c1

# source1
a1.sources.r1.type = exec
a1.sources.r1.shell = /bin/bash -c

a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/apps/logs/tail4.log

# source2
a1.sources.r2.type = exec
a1.sources.r2.shell = /bin/bash -c
a1.sources.r2.channels = c1
a1.sources.r2.command = tail -F /opt/apps/logs/tail2.log

# channel1  in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#sink1
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 221,820评论 6 515
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,648评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,324评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,714评论 1 297
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,724评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,328评论 1 310
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,897评论 3 421
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,804评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,345评论 1 318
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,431评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,561评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,238评论 5 350
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,928评论 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,417评论 0 24
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,528评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,983评论 3 376
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,573评论 2 359

推荐阅读更多精彩内容