flume与kafka集成遇到的问题与解决思路

0x00 背景知识

基本上想去用flume的同学都知道点flume的用途了。flume是一个分布式,可靠的,易用的,可以将不同源的日志进行,收集,汇总,或者存储的中间件。

0x01 使用场景

  • 数据来源:系统现有日志,有python脚本源源不断的从s3上拉下来,每10分钟拉一次,一次可能会拉取多个日志文件,视日志量而定,每个文件最大是10w行,超过会被分割。
  • 数据流转:需要及时将上面产生的日志发往kafka

0x02 flume的使用

flume支持三种不同的agent来发送数据,我这里比较符合的是spooldir这种方式.

  • 基本配置如下
# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = flume0
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.flume0.type = spooldir
a1.sources.flume0.spoolDir = /data/appdata/download
a1.sources.flume0.fileHeader = false
a1.sources.flume0.ignorePattern = ^(.)*\\.tmp$

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = shopping
a1.sinks.k1.brokerList = 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 100


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 200
a1.channels.c1.transactionCapacity = 200

# Bind the source and sink to the channel
a1.sources.flume0.channels = c1
a1.sinks.k1.channel = c1

0x03 遇到的问题

  • 文件读写报错

SpoolDir Source throws IllegalStateException: File has changed size since being read

运行后报上面的错,查了资料说是在flume读这个文件时,该文件不能被继续写入。改了数据生成逻辑,在没有写完成前,以.tmp结尾,写完后,再重命名去掉tmp后缀。涉及到的配置也比较简单

a1.sources.flume0.ignorePattern = ^(.)*\\.tmp$
  • kafka partition数据不均匀问题

发送kafka partition上面的数据不均匀,每次发送时,只往一个partition上面发,并没有同时往多个partition上面发。

查了资料,说是发送消息时不指定key将会随机发,但事实上,并没有。
这时,自己用python带的kafka python库直接发送测试,数据是均匀的。说明kafka集群是没问题的。这时候问题出在 kafka sink端。

事情到了这里,似乎需要正面刚这个问题了。
去flume官网下载源文件

  • 查看kafka sink源码,在flume-ng-kafka-sink这个文件夹下面,只有三个文件。
  • 查看最主要的文件 KafkaSink.java
  • 查看构造key的过程,核心代码如下
public static final String KEY_HDR = "key";

eventKey = headers.get(KEY_HDR);
KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>
          (eventTopic, eventKey, eventBody);
messageList.add(data);

说明key是从event中拿到的,我们只需要在event中构造一个包含key为 key 的header 键值对就能达到目的。

事情到了这里,似乎只要搞定event中加key就可以搞定了。

查询官方文档,发现还有一个拦截器 Interceptor 的玩意儿。

flume默认提供了一些拦截器

  • Timestamp Interceptor
  • Host Interceptor
  • Static Interceptor
  • UUID Interceptor
  • Regex Filtering Interceptor
  • Regex Extractor Interceptor
  • Search and Replace Interceptor
  • orphline Interceptor
  • ...

我们需要一个能配置headerName的拦截器,找了一下,只有uuid拦截器符合要求。

a1.sources.flume0.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.sources.flume0.interceptors.i1.headerName = key

加上上面二行,重启flume

/usr/local/flume/bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n a1 -Dflume.root.logger=INFO,console

查看kafka-manager中的partition中message的分布,果然妥妥的均匀了。

完整的配置如下:

a1.sources = flume0
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.flume0.type = spooldir
a1.sources.flume0.spoolDir = /data/appdata/download
a1.sources.flume0.fileHeader = false
a1.sources.flume0.ignorePattern = ^(.)*\\.tmp$
a1.sources.flume0.interceptors = i1
a1.sources.flume0.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.sources.flume0.interceptors.i1.headerName = key
a1.sources.flume0.interceptors.i1.preserveExisting = false

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = shopping
a1.sinks.k1.brokerList = 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 100

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 200
a1.channels.c1.transactionCapacity = 200

# Bind the source and sink to the channel
a1.sources.flume0.channels = c1
a1.sinks.k1.channel = c1

使用的版本为flume 1.6

其实,真正没有随机的原因本文并没有直接去找到,只是另辟蹊径解决了问题。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,233评论 6 495
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,357评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,831评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,313评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,417评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,470评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,482评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,265评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,708评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,997评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,176评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,503评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,150评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,391评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,034评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,063评论 2 352

推荐阅读更多精彩内容