02_Flume之各种Source的介绍及参数解析

一、Source介绍

Source用于对接各种数据源,将收集到的事件发送到临时存储Channel中。

常用的source类型有:Avro Source、Exec Source、Kafka Source、TaildirSource、Spooling Directory Source等,其他类型source请查阅Flume-NG官方介绍。

source类型 说明
1 Thrift Source 支持Thrift协议,Thrift Source监听Thrift端口,接收外部Thrift客户端发送过来的Thrift Event数据。在多级流中,Thrift Source可以和前一个Flume Agent的Thrift Sink配对,建立分层收集拓扑。Thrift Source支持基于Kerberos身份验证的安全模式
2 Kafka Source 从Kafka Topic中读取数据,Kafka Source相当于消息队列的Consumer
3 Exec Source 基于Unix的command在标准输出上生产数据。Exec Source在启动时调用的Unix命令,该命令进程会持续地把标准日志数据输出到Exec Source,如果命令进程关闭,Exec Source也会关闭。Exec Source支持cat [named pipe]或者tail -F [file]命令。Exec Source最大的问题就是数据有可能丢失,因为当Channel接收Exec Source数据出错时或者抛出异常时,Exec Client并不能捕获到该错误
4 Spooling Directory Source 监控指定目录内数据变更。Spooling Directory Source监听系统上的指定目录,当目录中有新的文件创建时,Spooling Directory Source会把新文件的内容读取并推送到Channel中,并且把已读取的文件重命名成指定格式或者把文件删除。由于数据是以文件的形式存放的系统中,Spooling Directory Source的可靠性非常高,即使是Flume Agent崩溃或者重启,数据也可以恢复。Spool Source有2个注意地方,第一个是拷贝到spool目录下的文件不可以再打开编辑,第二个是spool目录下不可包含相应的子目录Spooling Directory Source适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步。
5 Taildir Source Taildir Source是1.7版本的新特性,综合了Spooling Directory Source和Exec Source的优点。Taildir Source可实时监控一批文件,并记录每个文件最新消费位置,agent进程重启后不会有重复消费的问题。使用时建议用1.8.0版本的flume,1.8.0版本中解决了Taildir Source一个可能会丢数据的bug。
6 Netcat Source 监控某个端口,将流经端口的每一个文本行数据作为Event输入
7 HTTP Source 基于HTTP POST或GET方式的数据源,支持JSON、BLOB表示形式。其中GET主要用户测试,不建议生产环境使用。HTTP数据通过handler(实现HTTPSourceHandler接口)转换成Event,该handler接收HttpServletRequest并返回Event数组。如果handler出现异常,HTTP Source返回400错误。如果Channel满了或者Channel无法接收Event,HTTP Source返回503错误。
8 Syslog Sources 读取syslog数据,产生Event,支持UDP和TCP两种协议。这个Source分成三类SyslogTCP Source、Multiport Syslog TCP Source(多端口)与SyslogUDP Source。其中TCP Source为每一个用回车(\ n)来分隔的字符串创建一个新的事件。而UDP Source将整个消息作为一个单一的事件。
9 JMS Source 从JMS系统(消息、主题)中读取数据,该Source目前只在ActiveMQ中测试
10 Stress Source 压力测试用。StressSource 是内部负载生成source的实现,允许用户配置Event有效载荷的大小。
11 Twitter 1% firehose Source 通过API持续下载Twitter数据,试验性质
12 Scribe Source Scribe是另一种类型的提取系统。采用现有的Scribe提取系统,Flume应该使用基于Thrift的兼容传输协议的ScribeSource。
13 Sequence Generator Source 序列生成器数据源,生产序列数据,实验性质
14 Avro Source 支持Avro协议(Avro RPC),Avro Source监听Avro端口,接收外部Avro客户端发送过来的Avro Event数据。在多级流中,Avro Source可以和前一个Flume Agent的Avro Sink配对,建立分层收集拓扑

1、Avro Source

支持Avro协议,接收RPC事件请求。Avro Source通过监听Avro端口接收外部Avro客户端流事件(event),在Flume的多层架构中经常被使用接收上游Sink发送的event。

① 配置参数解析:

参数 默认值 描述
type avro 类型名称
bind 绑定的IP
port 监听的端口
threads (重要)接收请求的线程数,当需要接收多个avro客户端的数据流时要设置合适的线程数,否则会造成avro客户端数据流积压。

② 简单模板

# 命名 Agent 上的组件
agent_name.sources = source_name
agent_name.channels = channel_name
agent_name.sinks = sink_name

# source
agent_name.sources.source_name.type = avro
agent_name.sources.source_name.bind = 127.0.0.1
agent_name.sources.source_name.port = 9876
agent_name.sources.source_name.threads= 3

# channel
agent_name.channels.channel_name.type = file
XXX
XXX

# sink
agent_name.sinks.sink_name.type = hdfs
XXX
XXX

# source | channel | sink 关联
agent_name.sources.source_name.channels = channel_name
agent_name.sinks.sink_name.channel = channel_name

说明:这里设置 avro 启动接收客户端数据流的最大线程数为3。

2、Kafka Source

对接分布式消息队列 Kafka,作为 Kafka 的消费者持续从kafka中拉取数据,如果多个 Kafka Source 同时消费kafka中同一个主题(topic),则 Kafka Source 的kafka.consumer.group.id 应该设置成相同的组id,多个 Kafka Source 之间不会消费重复的数据,每一个Source 都会拉取 Topic 下的不同数据。

① 配置参数解析:

参数 默认值 描述
type org.apache.flume.source.kafka.KafkaSource 类型名称
kafka.bootstrap.servers Kafka broker列表,格式为ip1:port1,ip2:port2……,建议配置多个值提高容错能力,多个值之间用逗号隔开。
kafka.topics 消费的topic名称。
kafka.topics.regex 通过正则表达式匹配一组topic,设置此选项会覆盖kafka.topics选项的设置。
kafka.consumer.group.id flume kafka source所属组id
batchSize 1000 批量写入channel的最大消息数
batchDurationMillis 1000 等待批量写入channel的最长时间,这个参数和batchSize两个参数只要有一个满足都会触发批量写入channel操作,单位:毫秒

② 简单模板

# 命名 Agent 上的组件
agent_name.sources = source_name
agent_name.channels = channel_name
agent_name.sinks = sink_name

# source
agent_name.sources.source_name.type = org.apache.flume.source.kafka.KafkaSource
agent_name.sources.source_name.batchSize = 5000
agent_name.sources.source_name.batchDurationMillis = 2000
agent_name.sources.source_name.kafka.bootstrap.servers = zkServer01:9092,zkServer02:9092
agent_name.sources.source_name.kafka.topics = test_topic
agent_name.sources.source_name.kafka.consumer.group.id = flume_consumer_test

# channel
agent_name.channels.channel_name.type = file
XXX
XXX

# sink
agent_name.sinks.sink_name.type = hdfs
XXX
XXX

# source | channel | sink 关联
agent_name.sources.source_name.channels = channel_name
agent_name.sinks.sink_name.channel = channel_name

说明:这里设置 Source 批量写入 Channel 的最大消息数为5000,Source 等待批量写入 Channel 的最长时间为2秒,Channel 拉取数据的 Kafka Broker 列表为zkServer01:9092,zkServer02:9092,Source 消费的主题名称为 test_topic ,Source 所属的consumer group id 为 flume_consumer_test。

3、Exec Source

支持Linux命令,收集标准输出数据或者通过tail -f file的方式监听指定文件。ExecSource可以实现实时的消息传输,但是它不记录已经读取文件的位置,不支持断点续传,如果Exec Source重启或者挂掉都会造成后续增加的消息丢失,建议只是在测试环境使用。

① 配置参数解析:

参数 默认值 描述
type exec 类型名称
command Linux命令

② 简单模板

# 命名 Agent 上的组件
agent_name.sources = source_name
agent_name.channels = channel_name
agent_name.sinks = sink_name

# source
agent_name.sources.source_name.type = exec
agent_name.sources.source_name.command = tail -F /var/log/test.log

# channel
agent_name.channels.channel_name.type = file
XXX
XXX

# sink
agent_name.sinks.sink_name.type = hdfs
XXX
XXX

# source | channel | sink 关联
agent_name.sources.source_name.channels = channel_name
agent_name.sinks.sink_name.channel = channel_name

说明:这里设置 Source 通过tail -F命令监听/var/log/test.log文件。

4、Spooling Directory Source

监听一个文件夹,收集文件夹下新文件数据,收集完新文件数据会将文件名称的后缀改为.COMPLETED,缺点是不支持老文件新增数据的收集,并且不能够对嵌套文件夹递归监听。

① 配置参数解析:

参数 默认值 描述
type spooldir 类型名称
spoolDir source监听的文件夹。
fileHeader false 是否添加文件的绝对路径到event的header中
fileHeaderKey file 添加到event header中文件绝对路径的键值
selector.type replicating 选择器类型(可选值为replicating或multiplexing)
fileSuffix COMPLETED 收集完新文件数据给文件添加的后缀名称

② 简单模板

# 命名 Agent 上的组件
agent_name.sources = source_name
agent_name.channels = channel_name
agent_name.sinks = sink_name

# source
agent_name.sources.source_name.type = spooldir
agent_name.sources.source_name.spoolDir = /var/log/test.log
agent_name.sources.source_name.fileHeader = true

# channel
agent_name.channels.channel_name.type = file
XXX
XXX

# sink
agent_name.sinks.sink_name.type = hdfs
XXX
XXX

# source | channel | sink 关联
agent_name.sources.source_name.channels = channel_name
agent_name.sinks.sink_name.channel = channel_name

说明:这里设置 Source 监听的文件夹路径为/usr/local/flume/log,在 event 头信息中添加文件绝对路径信息。

5、Taildir Source (建议修改源码,后续补充相关操作文章)

监听一个文件或文件夹,通过正则表达式匹配需要监听的数据源文件,支持文件夹嵌套递归监听(重要source), Taildir Source将通过监听的文件位置写入到文件中实现断点续传,并且能够保证没有重复数据的读取。

① 配置参数解析:

参数 默认值 描述
type TAILDIR 类型名称
positionFile 保存监听文件读取位置的文件路径
skipToEnd false 在位置文件中没有保存监听文件的位置是否直接跳到文件的末尾
idleTimeout 120000 关闭空闲文件延迟时间,如果有新的记录添加到已关闭的空闲文件taildir srouce将继续打开该空闲文件,单位:毫秒。
writePosInterval 3000 向保存读取位置文件中写入读取文件位置的时间间隔(单位:毫秒)
batchSize 100 批量写入channel最大event数
maxBackoffSleep 5000 每次最后一次尝试没有获取到监听文件最新数据的最大延迟时间,单位:毫秒。
backoffSleepIncrement 1000 每次最后一次尝试没有获取到监听文件最新数据后增加延迟时间的幅度
cachePatternMatching true 监听的文件夹下通过正则表达式匹配的文件数量可能会很多,将匹配成功的监听文件列表和读取文件列表的顺序都添加到缓存中可以提高性能
fileHeader false 是否添加文件的绝对路径到event的header中
fileHeaderKey file 添加到event header中文件绝对路径的键值
filegroups 监听的文件组列表,taildirsource通过文件组监听多个目录或文件
filegroups.<filegroupName> 表达式路径或者监听指定文件路径

② 简单模板

# 命名 Agent 上的组件
agent_name.sources = source_name
agent_name.channels = channel_name
agent_name.sinks = sink_name

# source
agent_name.sources.source_name.type = TAILDIR
agent_name.sources.source_name.filegroups = f1 f2
agent_name.sources.source_name.filegroups.f1 = /var/log/test_01.log
agent_name.sources.source_name.filegroups.f2 = /var/log/test_02/*.log*
agent_name.sources.source_name.fileHeader = true
agent_name.sources.source_name.positionFile = /usr/local/flume/position/taildir_position.json

# channel
agent_name.channels.channel_name.type = file
XXX
XXX

# sink
agent_name.sinks.sink_name.type = hdfs
XXX
XXX

# source | channel | sink 关联
agent_name.sources.source_name.channels = channel_name
agent_name.sinks.sink_name.channel = channel_name

说明:保存监听文件读取位置信息的文件路径为/usr/local/flume/position/taildir_position.json,监听文件列表包含两个监听文件组f1、f2,f1监听指定log文件/var/log/test_01.log ,f2通过正则表达式匹配/var/log/test_02/路径下包含log关键字的所有文件,并且将文件的绝对路径添加到event的头信息中。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,133评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,682评论 3 390
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,784评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,508评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,603评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,607评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,604评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,359评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,805评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,121评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,280评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,959评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,588评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,206评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,193评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,144评论 2 352

推荐阅读更多精彩内容