0x00 背景知识
基本上想去用flume的同学都知道点flume的用途了。flume是一个分布式,可靠的,易用的,可以将不同源的日志进行,收集,汇总,或者存储的中间件。
0x01 使用场景
- 数据来源:系统现有日志,有python脚本源源不断的从s3上拉下来,每10分钟拉一次,一次可能会拉取多个日志文件,视日志量而定,每个文件最大是10w行,超过会被分割。
- 数据流转:需要及时将上面产生的日志发往kafka
0x02 flume的使用
flume支持三种不同的agent来发送数据,我这里比较符合的是spooldir这种方式.
- 基本配置如下
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = flume0
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.flume0.type = spooldir
a1.sources.flume0.spoolDir = /data/appdata/download
a1.sources.flume0.fileHeader = false
a1.sources.flume0.ignorePattern = ^(.)*\\.tmp$
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = shopping
a1.sinks.k1.brokerList = 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 100
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 200
a1.channels.c1.transactionCapacity = 200
# Bind the source and sink to the channel
a1.sources.flume0.channels = c1
a1.sinks.k1.channel = c1
0x03 遇到的问题
- 文件读写报错
SpoolDir Source throws IllegalStateException: File has changed size since being read
运行后报上面的错,查了资料说是在flume读这个文件时,该文件不能被继续写入。改了数据生成逻辑,在没有写完成前,以.tmp结尾,写完后,再重命名去掉tmp后缀。涉及到的配置也比较简单
a1.sources.flume0.ignorePattern = ^(.)*\\.tmp$
- kafka partition数据不均匀问题
发送kafka partition上面的数据不均匀,每次发送时,只往一个partition上面发,并没有同时往多个partition上面发。
查了资料,说是发送消息时不指定key将会随机发,但事实上,并没有。
这时,自己用python带的kafka python库直接发送测试,数据是均匀的。说明kafka集群是没问题的。这时候问题出在 kafka sink端。
事情到了这里,似乎需要正面刚这个问题了。
去flume官网下载源文件
- 查看kafka sink源码,在flume-ng-kafka-sink这个文件夹下面,只有三个文件。
- 查看最主要的文件 KafkaSink.java
- 查看构造key的过程,核心代码如下
public static final String KEY_HDR = "key";
eventKey = headers.get(KEY_HDR);
KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>
(eventTopic, eventKey, eventBody);
messageList.add(data);
说明key是从event中拿到的,我们只需要在event中构造一个包含key为 key 的header 键值对就能达到目的。
事情到了这里,似乎只要搞定event中加key就可以搞定了。
查询官方文档,发现还有一个拦截器 Interceptor 的玩意儿。
flume默认提供了一些拦截器
- Timestamp Interceptor
- Host Interceptor
- Static Interceptor
- UUID Interceptor
- Regex Filtering Interceptor
- Regex Extractor Interceptor
- Search and Replace Interceptor
- orphline Interceptor
- ...
我们需要一个能配置headerName的拦截器,找了一下,只有uuid拦截器符合要求。
a1.sources.flume0.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.sources.flume0.interceptors.i1.headerName = key
加上上面二行,重启flume
/usr/local/flume/bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n a1 -Dflume.root.logger=INFO,console
查看kafka-manager中的partition中message的分布,果然妥妥的均匀了。
完整的配置如下:
a1.sources = flume0
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.flume0.type = spooldir
a1.sources.flume0.spoolDir = /data/appdata/download
a1.sources.flume0.fileHeader = false
a1.sources.flume0.ignorePattern = ^(.)*\\.tmp$
a1.sources.flume0.interceptors = i1
a1.sources.flume0.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.sources.flume0.interceptors.i1.headerName = key
a1.sources.flume0.interceptors.i1.preserveExisting = false
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = shopping
a1.sinks.k1.brokerList = 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 100
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 200
a1.channels.c1.transactionCapacity = 200
# Bind the source and sink to the channel
a1.sources.flume0.channels = c1
a1.sinks.k1.channel = c1
使用的版本为flume 1.6
其实,真正没有随机的原因本文并没有直接去找到,只是另辟蹊径解决了问题。