Flume-1.8.0(三)Source支持的类型

avro 类型的 source

监听 Avro 端口来接收外部 avro 客户端的事件流。和 netcat 不同的是,avro-source 接收到的是经过 avro 序列化后的数据,然后反序列化数据继续传输。所以,如果是 avro-source 的话,源数据必须经过 avro 序列化后的数据。而 netcat 接收的是字符串格式。

利用avro source可以实现多级流动、扇出流、扇入流等效果,另外,也可以接收通过flume提供的avro客户端发送的日志信息。

avro source配置说明

在 /opt/software/flume-1.8.0/conf 下创建 source-avro.conf

# a1 代表一个flume 给每个组件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1

# 指定source 的数据来源以及堆外开放的端口
a1.sources.r1.type=avro
a1.sources.r1.bind=node113
a1.sources.r1.port=8888

# 指定a1的channels基于内存
a1.channels.c1.type=memory
# 指定a1的sinks 输出到控制台
a1.sinks.s1.type=logger

# 绑定a1 sources和channle 的关系
a1.sources.r1.channels=c1
# 绑定a1 sinks 和 channel 的关系
a1.sinks.s1.channel=c1

启动

flume-ng agent -n a1 -c /opt/software/flume-1.8.0/conf -f /opt/software/flume-1.8.0/conf/source-avro.conf -Dflume.root.logger=INFO,console

测试

在 node103 的 flume 执行命令,把配置文件发过去

./flume-ng avro-client -H node113 -p 8888 -F /opt/software/flume-1.8.0/conf/source-avro.conf -c /opt/software/flume-1.8.0/conf/

node113 接收会打印

exec 类型的 source

可以将命令产生的输出做为源

exec 配置

在 /opt/software/flume-1.8.0/conf 下创建 source-exec.conf
将type改成exec,并添加 command 命令,会执行命令做为 source 的数据源。

# a1 代表一个flume 给每个组件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1

# 
a1.sources.r1.type=exec
a1.sources.r1.command=ping node103

# 指定a1的channels基于内存
a1.channels.c1.type=memory
# 指定a1的sinks 输出到控制台
a1.sinks.s1.type=logger

# 绑定a1 sources和channle 的关系
a1.sources.r1.channels=c1
# 绑定a1 sinks 和 channel 的关系
a1.sinks.s1.channel=c1

spooling directory 类型的 source

将指定得文件加入到"自动搜集"目录中。flume会持续监听这个目录,把文件当作source来处理。注意:一旦文件被放到 “自动收集” 目录中,便不能修改,如果修改,flume 会报错。此外,他不能有重名的文件,否则也会报错。

当一个文件被 flume 读了以后,会在末尾 添加 .COMPLETED 标识

spooling directory 配置 source

在 /opt/software/flume-1.8.0/conf 下创建 source-spooldir.conf

# a1 代表一个flume 给每个组件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1

# 
a1.sources.r1.type=spooldir
# 目录需要提前建立
a1.sources.r1.spoolDir=/home/data

# 指定a1的channels基于内存
a1.channels.c1.type=memory
# 指定a1的sinks 输出到控制台
a1.sinks.s1.type=logger

# 绑定a1 sources和channle 的关系
a1.sources.r1.channels=c1
# 绑定a1 sinks 和 channel 的关系
a1.sinks.s1.channel=c1

sequence generator source(序列发生源) 类型的 source

一个简单的序列发生器,不断的产生事件,值是从0开始每次递增1.主要用来测试。测试消费能力。

spooling directory 配置 source

在 /opt/software/flume-1.8.0/conf 下创建 source-seq.conf

# a1 代表一个flume 给每个组件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1


a1.sources.r1.type=seq

# 指定a1的channels基于内存
a1.channels.c1.type=memory
# 指定a1的sinks 输出到控制台
a1.sinks.s1.type=logger

# 绑定a1 sources和channle 的关系
a1.sources.r1.channels=c1
# 绑定a1 sinks 和 channel 的关系
a1.sinks.s1.channel=c1

http 类型的 source

此 source 接受 htpp 的 get 和 post 请求做为f lume 的事件。其中 get 方式应该只用于试验。

如果想让flume正确解析http协议信息,比如解析出请求头、请求体等信息,需要提供一个可插拔的 “处理器” 来将请求转换为事件对象,这个处理器必须实现 HTTPSourceHandler 接口。

这个处理器接受一个 HttpServletRequest 对象,并返回一个 Flume Event 对象集合。

Flume 提供了一些常用的 Handler(处理器)。

  • JSONHandler
    可以处理JSON格式的数据,并支持UTF-8 UTF-16 UTF-32 字符集,该 handler 接受 Event 数组,并根据请求头中的编码将其转换位 Flume Event,如果没有指定的编码,默认编码为 UTF-8.

spooling directory 配置 source

在 /opt/software/flume-1.8.0/conf 下创建 source-http.conf

# a1 代表一个flume 给每个组件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1


a1.sources.r1.type=http
a1.sources.r1.bind=node113
a1.sources.r1.port=8888

# 指定a1的channels基于内存
a1.channels.c1.type=memory
# 指定a1的sinks 输出到控制台
a1.sinks.s1.type=logger

# 绑定a1 sources和channle 的关系
a1.sources.r1.channels=c1
# 绑定a1 sinks 和 channel 的关系
a1.sinks.s1.channel=c1

启动测试

启动

flume-ng agent -n a1 -c /opt/software/flume-1.8.0/conf -f /opt/software/flume-1.8.0/conf/source-http.conf -Dflume.root.logger=INFO,console

测试,从node103发送数据

curl -X POST -d '[{"headers":{"text":"hello wold"},"body":"hello hello"}]' http://node113:8888

node113接收数据

2021-05-17 17:37:23,102 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{text=hello wold} body: 68 65 6C 6C 6F 20 68 65 6C 6C 6F                hello hello }

Kafka 类型

flume-kafka-source 是flume内置的kafka source数据组件,是为了拉取kafka数据。flume-kafka-source 的offset是交由zk集群去维护offset。

flume 属于单线程拉取数据并将数据发送内置channel并通过sink组件进行数据转发和处理,故对于kafka集群多副本方式拉取数据的时候,应适当考虑多个flume节点拉取kafka多副本数据,以避免flume节点在多个kafka集群副本中轮询。加大flume拉取kafka数据的速率。

属性 默认值 描述
channels 配置的channels 可配置多个channels 后续文章会说到
type org.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.servers 配置kafka集群地址
kafka.consumer.group.id flume 唯一确定的消费者群体。 在多个源或代理中设置相同的ID表示它们是同一个使用者组的一部分
kafka.topics 你需要消费的topic
kafka.topics.regex 正则表达式,用于定义源订阅的主题集。 此属性的优先级高于kafka.topics,如果存在则覆盖kafka.topics
batchSize 1000 一批中写入Channel的最大消息数 (优化项)
batchDurationMillis 1000 将批次写入通道之前的最长时间(以毫秒为单位)只要达到第一个大小和时间,就会写入批次。(优化项)
backoffSleepIncrement 1000 Kafka主题显示为空时触发的初始和增量等待时间。 等待时间将减少对空kafka主题的激进ping操作。 一秒钟是摄取用例的理想选择,但使用拦截器的低延迟操作可能需要较低的值。
maxBackoffSleep 5000 Kafka主题显示为空时触发的最长等待时间。 5秒是摄取用例的理想选择,但使用拦截器的低延迟操作可能需要较低的值。
useFlumeEventFormat false 默认情况下,事件从Kafka主题直接作为字节直接进入事件主体。 设置为true以将事件读取为Flume Avro二进制格式。 与KafkaSink上的相同属性或Kafka Channel上的parseAsFlumeEvent属性一起使用时,这将保留在生成端发送的任何Flume标头。
setTopicHeader true 设置为true时,将检索到的消息的主题存储到标题中,该标题由topicHeader属性定义。
topicHeader topic 如果setTopicHeader属性设置为true,则定义用于存储接收消息主题名称的标题的名称。 如果与Kafka SinktopicHeader属性结合使用,应该小心,以避免在循环中将消息发送回同一主题。
migrateZookeeperOffsets true 如果找不到Kafka存储的偏移量,请在Zookeeper中查找偏移量并将它们提交给Kafka。 这应该是支持从旧版本的Flume无缝Kafka客户端迁移。 迁移后,可以将其设置为false,但通常不需要这样做。 如果未找到Zookeeper偏移量,则Kafka配置kafka.consumer.auto.offset.reset定义如何处理偏移量。 查看[Kafka文档](http://kafka.apache.org/documentation.html#newconsumerconfigs)了解详细信息
kafka.consumer.security.protocol PLAINTEXT 如果使用某种级别的安全性写入Kafka,则设置为SASL_PLAINTEXT,SASL_SSL或SSL。
Other Kafka Consumer Properties 这些属性用于配置Kafka Consumer。 可以使用Kafka支持的任何消费者财产。 唯一的要求是在前缀为“kafka.consumer”的前缀中添加属性名称。 例如:kafka.consumer.auto.offset.reset

Kafka source 覆盖了两个 Kafka 消费者参数:source 将 auto.commit.enable 设置为“false”,以批次进行提交。Kafka source 保证至少一次消息检索策略。source 启动时可能会出现重复项。Kafka Source 还为key.deserializer(org.apache.kafka.common.serialization.StringSerializer) 和 value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer) 提供了默认值。不建议修改这些参数。

#1 代表一个flume 给每个组件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1

# 设置kafka
a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource
# 一批写入 Channel 的最大消息数
a1.sources.r1.batchSize=5000
# 将批次写入通道之前的最长时间(以毫秒为单位)只要达到第一个大小和时间,就会写入批次。(优化项)
a1.sources.r1.batchDurationMillis=2000
a1.sources.r1.kafka.bootstrap.servers=192.168.81.101:9092
a1.sources.r1.kafka.topics=flink_yx_produce,flink_yc_produce
a1.sources.r1.kafka.consumer.group.id=flume_consume_1


# 指定a1的channels基于内存
a1.channels.c1.type=memory
# 指定a1的sinks 输出到控制台
a1.sinks.s1.type=logger

# 绑定a1 sources和channle 的关系
a1.sources.r1.channels=c1
# 绑定a1 sinks 和 channel 的关系
a1.sinks.s1.channel=c1

agent.sources.r1.batchSize = 5000; agent.sources.r1.batchDurationMillis = 2000,即每2秒钟拉取 kafka 一批数据,批数据大小为5000放入到 flume-channels 中 。这两个值总和考虑以下两项:

  • 需要配置kafka单条数据 broker.conf 中配置 message.max.bytes
  • 当前flume channel sink 组件最大消费能力如何

文档地址 https://github.com/apache/flume/blob/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst

Taildir Source

在日志收集服务器的某个目录下,会按照一段时间生成一个日志文件,并且日志会不断的追加到这个文件中,比如,每小时一个命名规则为log_20151015_10.log的日志文件,所有10点产生的日志都会追加到这个文件中,到了11点,就会生成另一个log_20151015_11.log的文件。

这种场景如果通过flume(1.6)收集,当前提供的Spooling Directory Source和Exec Source均不能满足动态实时收集的需求,在当前正在开发的flume1.7版本中,提供了一个非常好用的TaildirSource,使用这个source,可以监控一个目录,并且使用正则表达式匹配该目录中的文件名进行实时收集。

Taildir Source可实时监控一批文件,并记录每个文件最新消费位置,agent进程重启后不会有重复消费的问题。

# source的名字
agent.sources = s1
# channels的名字
agent.channels = c1
# sink的名字
agent.sinks = r1

# 指定source使用的channel
agent.sources.s1.channels = c1
# 指定sink使用的channel
agent.sinks.r1.channel = c1

######## source相关配置 ########
# source类型
agent.sources.s1.type = TAILDIR
# 元数据位置
agent.sources.s1.positionFile = /Users/wangpei/tempData/flume/taildir_position.json
# 监控的目录
agent.sources.s1.filegroups = f1
agent.sources.s1.filegroups.f1=/Users/wangpei/tempData/flume/data/.*log
agent.sources.s1.fileHeader = true

######## channel相关配置 ########
# channel类型
agent.channels.c1.type = file
# 数据存放路径
agent.channels.c1.dataDirs = /Users/wangpei/tempData/flume/filechannle/dataDirs
# 检查点路径
agent.channels.c1.checkpointDir = /Users/wangpei/tempData/flume/filechannle/checkpointDir
# channel中最多缓存多少
agent.channels.c1.capacity = 1000
# channel一次最多吐给sink多少
agent.channels.c1.transactionCapacity = 100

######## sink相关配置 ########
# sink类型
agent.sinks.r1.type = org.apache.flume.sink.kafka.KafkaSink
# brokers地址
agent.sinks.r1.kafka.bootstrap.servers = localhost:9092
# topic
agent.sinks.r1.kafka.topic = testTopic3
# 压缩
agent.sinks.r1.kafka.producer.compression.type = snappy
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352

推荐阅读更多精彩内容