avro 类型的 source
监听 Avro 端口来接收外部 avro 客户端的事件流。和 netcat 不同的是,avro-source 接收到的是经过 avro 序列化后的数据,然后反序列化数据继续传输。所以,如果是 avro-source 的话,源数据必须经过 avro 序列化后的数据。而 netcat 接收的是字符串格式。
利用avro source可以实现多级流动、扇出流、扇入流等效果,另外,也可以接收通过flume提供的avro客户端发送的日志信息。
avro source配置说明
在 /opt/software/flume-1.8.0/conf 下创建 source-avro.conf
# a1 代表一个flume 给每个组件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1
# 指定source 的数据来源以及堆外开放的端口
a1.sources.r1.type=avro
a1.sources.r1.bind=node113
a1.sources.r1.port=8888
# 指定a1的channels基于内存
a1.channels.c1.type=memory
# 指定a1的sinks 输出到控制台
a1.sinks.s1.type=logger
# 绑定a1 sources和channle 的关系
a1.sources.r1.channels=c1
# 绑定a1 sinks 和 channel 的关系
a1.sinks.s1.channel=c1
启动
flume-ng agent -n a1 -c /opt/software/flume-1.8.0/conf -f /opt/software/flume-1.8.0/conf/source-avro.conf -Dflume.root.logger=INFO,console
测试
在 node103 的 flume 执行命令,把配置文件发过去
./flume-ng avro-client -H node113 -p 8888 -F /opt/software/flume-1.8.0/conf/source-avro.conf -c /opt/software/flume-1.8.0/conf/
node113 接收会打印
exec 类型的 source
可以将命令产生的输出做为源
exec 配置
在 /opt/software/flume-1.8.0/conf 下创建 source-exec.conf
将type改成exec,并添加 command 命令,会执行命令做为 source 的数据源。
# a1 代表一个flume 给每个组件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1
#
a1.sources.r1.type=exec
a1.sources.r1.command=ping node103
# 指定a1的channels基于内存
a1.channels.c1.type=memory
# 指定a1的sinks 输出到控制台
a1.sinks.s1.type=logger
# 绑定a1 sources和channle 的关系
a1.sources.r1.channels=c1
# 绑定a1 sinks 和 channel 的关系
a1.sinks.s1.channel=c1
spooling directory 类型的 source
将指定得文件加入到"自动搜集"目录中。flume会持续监听这个目录,把文件当作source来处理。注意:一旦文件被放到 “自动收集” 目录中,便不能修改,如果修改,flume 会报错。此外,他不能有重名的文件,否则也会报错。
当一个文件被 flume 读了以后,会在末尾 添加 .COMPLETED 标识
spooling directory 配置 source
在 /opt/software/flume-1.8.0/conf 下创建 source-spooldir.conf
# a1 代表一个flume 给每个组件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1
#
a1.sources.r1.type=spooldir
# 目录需要提前建立
a1.sources.r1.spoolDir=/home/data
# 指定a1的channels基于内存
a1.channels.c1.type=memory
# 指定a1的sinks 输出到控制台
a1.sinks.s1.type=logger
# 绑定a1 sources和channle 的关系
a1.sources.r1.channels=c1
# 绑定a1 sinks 和 channel 的关系
a1.sinks.s1.channel=c1
sequence generator source(序列发生源) 类型的 source
一个简单的序列发生器,不断的产生事件,值是从0开始每次递增1.主要用来测试。测试消费能力。
spooling directory 配置 source
在 /opt/software/flume-1.8.0/conf 下创建 source-seq.conf
# a1 代表一个flume 给每个组件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1
a1.sources.r1.type=seq
# 指定a1的channels基于内存
a1.channels.c1.type=memory
# 指定a1的sinks 输出到控制台
a1.sinks.s1.type=logger
# 绑定a1 sources和channle 的关系
a1.sources.r1.channels=c1
# 绑定a1 sinks 和 channel 的关系
a1.sinks.s1.channel=c1
http 类型的 source
此 source 接受 htpp 的 get 和 post 请求做为f lume 的事件。其中 get 方式应该只用于试验。
如果想让flume正确解析http协议信息,比如解析出请求头、请求体等信息,需要提供一个可插拔的 “处理器” 来将请求转换为事件对象,这个处理器必须实现 HTTPSourceHandler 接口。
这个处理器接受一个 HttpServletRequest 对象,并返回一个 Flume Event 对象集合。
Flume 提供了一些常用的 Handler(处理器)。
-
JSONHandler
可以处理JSON格式的数据,并支持UTF-8 UTF-16 UTF-32 字符集,该 handler 接受 Event 数组,并根据请求头中的编码将其转换位 Flume Event,如果没有指定的编码,默认编码为 UTF-8.
spooling directory 配置 source
在 /opt/software/flume-1.8.0/conf 下创建 source-http.conf
# a1 代表一个flume 给每个组件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1
a1.sources.r1.type=http
a1.sources.r1.bind=node113
a1.sources.r1.port=8888
# 指定a1的channels基于内存
a1.channels.c1.type=memory
# 指定a1的sinks 输出到控制台
a1.sinks.s1.type=logger
# 绑定a1 sources和channle 的关系
a1.sources.r1.channels=c1
# 绑定a1 sinks 和 channel 的关系
a1.sinks.s1.channel=c1
启动测试
启动
flume-ng agent -n a1 -c /opt/software/flume-1.8.0/conf -f /opt/software/flume-1.8.0/conf/source-http.conf -Dflume.root.logger=INFO,console
测试,从node103发送数据
curl -X POST -d '[{"headers":{"text":"hello wold"},"body":"hello hello"}]' http://node113:8888
node113接收数据
2021-05-17 17:37:23,102 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{text=hello wold} body: 68 65 6C 6C 6F 20 68 65 6C 6C 6F hello hello }
Kafka 类型
flume-kafka-source 是flume内置的kafka source数据组件,是为了拉取kafka数据。flume-kafka-source 的offset是交由zk集群去维护offset。
flume 属于单线程拉取数据并将数据发送内置channel并通过sink组件进行数据转发和处理,故对于kafka集群多副本方式拉取数据的时候,应适当考虑多个flume节点拉取kafka多副本数据,以避免flume节点在多个kafka集群副本中轮询。加大flume拉取kafka数据的速率。
属性 | 默认值 | 描述 |
---|---|---|
channels | – | 配置的channels 可配置多个channels 后续文章会说到 |
type | – | org.apache.flume.source.kafka.KafkaSource |
kafka.bootstrap.servers | – | 配置kafka集群地址 |
kafka.consumer.group.id | flume | 唯一确定的消费者群体。 在多个源或代理中设置相同的ID表示它们是同一个使用者组的一部分 |
kafka.topics | – | 你需要消费的topic |
kafka.topics.regex | – | 正则表达式,用于定义源订阅的主题集。 此属性的优先级高于kafka.topics ,如果存在则覆盖kafka.topics 。 |
batchSize | 1000 | 一批中写入Channel的最大消息数 (优化项) |
batchDurationMillis | 1000 | 将批次写入通道之前的最长时间(以毫秒为单位)只要达到第一个大小和时间,就会写入批次。(优化项) |
backoffSleepIncrement | 1000 | Kafka主题显示为空时触发的初始和增量等待时间。 等待时间将减少对空kafka 主题的激进ping操作。 一秒钟是摄取用例的理想选择,但使用拦截器的低延迟操作可能需要较低的值。 |
maxBackoffSleep | 5000 | Kafka主题显示为空时触发的最长等待时间。 5秒是摄取用例的理想选择,但使用拦截器的低延迟操作可能需要较低的值。 |
useFlumeEventFormat | false | 默认情况下,事件从Kafka主题直接作为字节直接进入事件主体。 设置为true以将事件读取为Flume Avro二进制格式。 与KafkaSink上的相同属性或Kafka Channel上的parseAsFlumeEvent属性一起使用时,这将保留在生成端发送的任何Flume标头。 |
setTopicHeader | true | 设置为true时,将检索到的消息的主题存储到标题中,该标题由topicHeader 属性定义。 |
topicHeader | topic | 如果setTopicHeader 属性设置为true ,则定义用于存储接收消息主题名称的标题的名称。 如果与Kafka SinktopicHeader 属性结合使用,应该小心,以避免在循环中将消息发送回同一主题。 |
migrateZookeeperOffsets | true | 如果找不到Kafka存储的偏移量,请在Zookeeper中查找偏移量并将它们提交给Kafka。 这应该是支持从旧版本的Flume无缝Kafka客户端迁移。 迁移后,可以将其设置为false,但通常不需要这样做。 如果未找到Zookeeper偏移量,则Kafka配置kafka.consumer.auto.offset.reset定义如何处理偏移量。 查看[Kafka文档](http://kafka.apache.org/documentation.html#newconsumerconfigs)了解详细信息 |
kafka.consumer.security.protocol | PLAINTEXT | 如果使用某种级别的安全性写入Kafka,则设置为SASL_PLAINTEXT,SASL_SSL或SSL。 |
Other Kafka Consumer Properties | – | 这些属性用于配置Kafka Consumer。 可以使用Kafka支持的任何消费者财产。 唯一的要求是在前缀为“kafka.consumer”的前缀中添加属性名称。 例如:kafka.consumer.auto.offset.reset
|
Kafka source 覆盖了两个 Kafka 消费者参数:source 将 auto.commit.enable 设置为“false”,以批次进行提交。Kafka source 保证至少一次消息检索策略。source 启动时可能会出现重复项。Kafka Source 还为key.deserializer(org.apache.kafka.common.serialization.StringSerializer) 和 value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer) 提供了默认值。不建议修改这些参数。
#1 代表一个flume 给每个组件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1
# 设置kafka
a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource
# 一批写入 Channel 的最大消息数
a1.sources.r1.batchSize=5000
# 将批次写入通道之前的最长时间(以毫秒为单位)只要达到第一个大小和时间,就会写入批次。(优化项)
a1.sources.r1.batchDurationMillis=2000
a1.sources.r1.kafka.bootstrap.servers=192.168.81.101:9092
a1.sources.r1.kafka.topics=flink_yx_produce,flink_yc_produce
a1.sources.r1.kafka.consumer.group.id=flume_consume_1
# 指定a1的channels基于内存
a1.channels.c1.type=memory
# 指定a1的sinks 输出到控制台
a1.sinks.s1.type=logger
# 绑定a1 sources和channle 的关系
a1.sources.r1.channels=c1
# 绑定a1 sinks 和 channel 的关系
a1.sinks.s1.channel=c1
agent.sources.r1.batchSize = 5000; agent.sources.r1.batchDurationMillis = 2000,即每2秒钟拉取 kafka 一批数据,批数据大小为5000放入到 flume-channels 中 。这两个值总和考虑以下两项:
- 需要配置kafka单条数据 broker.conf 中配置 message.max.bytes
- 当前flume channel sink 组件最大消费能力如何
文档地址 https://github.com/apache/flume/blob/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst
Taildir Source
在日志收集服务器的某个目录下,会按照一段时间生成一个日志文件,并且日志会不断的追加到这个文件中,比如,每小时一个命名规则为log_20151015_10.log的日志文件,所有10点产生的日志都会追加到这个文件中,到了11点,就会生成另一个log_20151015_11.log的文件。
这种场景如果通过flume(1.6)收集,当前提供的Spooling Directory Source和Exec Source均不能满足动态实时收集的需求,在当前正在开发的flume1.7版本中,提供了一个非常好用的TaildirSource,使用这个source,可以监控一个目录,并且使用正则表达式匹配该目录中的文件名进行实时收集。
Taildir Source可实时监控一批文件,并记录每个文件最新消费位置,agent进程重启后不会有重复消费的问题。
# source的名字
agent.sources = s1
# channels的名字
agent.channels = c1
# sink的名字
agent.sinks = r1
# 指定source使用的channel
agent.sources.s1.channels = c1
# 指定sink使用的channel
agent.sinks.r1.channel = c1
######## source相关配置 ########
# source类型
agent.sources.s1.type = TAILDIR
# 元数据位置
agent.sources.s1.positionFile = /Users/wangpei/tempData/flume/taildir_position.json
# 监控的目录
agent.sources.s1.filegroups = f1
agent.sources.s1.filegroups.f1=/Users/wangpei/tempData/flume/data/.*log
agent.sources.s1.fileHeader = true
######## channel相关配置 ########
# channel类型
agent.channels.c1.type = file
# 数据存放路径
agent.channels.c1.dataDirs = /Users/wangpei/tempData/flume/filechannle/dataDirs
# 检查点路径
agent.channels.c1.checkpointDir = /Users/wangpei/tempData/flume/filechannle/checkpointDir
# channel中最多缓存多少
agent.channels.c1.capacity = 1000
# channel一次最多吐给sink多少
agent.channels.c1.transactionCapacity = 100
######## sink相关配置 ########
# sink类型
agent.sinks.r1.type = org.apache.flume.sink.kafka.KafkaSink
# brokers地址
agent.sinks.r1.kafka.bootstrap.servers = localhost:9092
# topic
agent.sinks.r1.kafka.topic = testTopic3
# 压缩
agent.sinks.r1.kafka.producer.compression.type = snappy