flume的memeryChannel中transactionCapacity和sink的batchsize需要注意事项

最近在做flume的实时日志收集,用flume默认的配置后,发现不是完全实时的,于是看了一下,原来是memeryChannel的transactionCapacity在作怪,因为他默认是100,也就是说收集端的sink会在收集到了100条以后再去提交事务(即发送到下一个目的地),于是我修改了transactionCapacity到10,想看看是不是会更加实时一点,结果发现收集日志的agent启动的时候报错了。

16/04/29 09:36:15 ERROR sink.AbstractRpcSink: Rpc Sink avro-sink: Unable to get event from channel memoryChannel. Exception follows.
org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 10 full, consider committing more frequently, increasing capacity, or increasing thread count
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.[Java](http://lib.csdn.net/base/javase "Java SE知识库"):96)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:354)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)

于是很纳闷,为什么默认值100可以,而设置10就会说小了呢,于是查阅资料,发现原来是sink的batchsize参数在作怪,下面,我就来理一理这个来龙去脉,这个sink的batchsize是什么意思呢,就是sink会一次从channel中取多少个event去发送,而这个发送是要最终以事务的形式去发送的,因此这个batchsize的event会传送到一个事务的缓存队列中(takeList),这是一个双向队列,这个队列可以在事务失败时进行回滚(也就是把取出来的数据吐memeryChannel的queue中),它的初始大小就是transactionCapacity定义的大小,源码中有: takeList = new LinkedBlockingDeque<Event>(transCapacity); 源码来自https://segmentfault.com/a/1190000003586635的分享。

再看这个错误抛出的地方:

 if(takeList.remainingCapacity() == 0) {
        throw new ChannelException("Take list for MemoryTransaction, capacity " +
            takeList.size() + " full, consider committing more frequently, " +
            "increasing capacity, or increasing thread count");
 }

在上面的情况中,sink一次取100个events,塞到takelist中,在塞了10个后,就会引发上述异常,因此,这个错误的解决办法就是:在sink中,channel的transactionCapacity参数不能小于sink的batchsize。

=================================================================================

Flume-ng出现HDFS IO error,Callable timed out异常

摘自http://blog.csdn.net/wsscy2004/article/details/22179361

目前解决方案:

三、记Flume-NG一些注意事项

一、关于Source:

1、spool-source:适合静态文件,即文件本身不是动态变化的;

2、avro source可以适当提高线程数量来提高此source性能;

3、ThriftSource在使用时有个问题需要注意,使用批量操作时出现异常并不会打印异常内容而是"Thrift source %s could not append events to the channel.",这是因为源码中在出现异常时,它并未捕获异常而是获取组件名称,这是源码中的一个bug,也可以说明thrift很少有人用,否则这个问题也不会存在在很多版本中;

4、如果一个source对应多个channel,默认就是每个channel是同样的一份数据,会把这批数据复制N份发送到N个channel中,所以如果某个channel满了会影响整体的速度的哦;

5、ExecSource官方文档已经说明是异步的,可能会丢数据哦,尽量使用tail -F,注意是大写的;

二、关于Channel:

1、采集节点建议使用新的复合类型的SpillableMemoryChannel,汇总节点建议采用memory channel,具体还要看实际的数据量,一般每分钟数据量超过120MB大小的flume agent都建议用memory channel(自己测的file channel处理速率大概是2M/s,不同机器、不同环境可能不同,这里只提供参考),因为一旦此agent的channel出现溢出情况,将会导致大多数时间处于file channel(SpillableMemoryChannel本身是file channel的一个子类,而且复合channel会保证一定的event的顺序的使得读完内存中的数据后,再需要把溢出的拿走,可能这时内存已满又会溢出。。。),性能大大降低,汇总一旦成为这样后果可想而知;

2、调整memory 占用物理内存空间,需要两个参数byteCapacityBufferPercentage(默认是20)和byteCapacity(默认是JVM最大可用内存的0.8)来控制,计算公式是:byteCapacity = (int)((context.getLong("byteCapacity", defaultByteCapacity).longValue() * (1 - byteCapacityBufferPercentage * .01 )) /byteCapacitySlotSize),很明显可以调节这两个参数来控制,至于byteCapacitySlotSize默认是100,将物理内存转换成槽(slot)数,这样易于管理,但是可能会浪费空间,至少我是这样想的。。。;

3、还有一个有用的参数"keep-alive"这个参数用来控制channel满时影响source的发送,channel空时影响sink的消费,就是等待时间,默认是3s,超过这个时间就甩异常,一般不需配置,但是有些情况很有用,比如你得场景是每分钟开头集中发一次数据,这时每分钟的开头量可能比较大,后面会越来越小,这时你可以调大这个参数,不至于出现channel满了得情况;

三、关于Sink:

1、avro sink的batch-size可以设置大一点,默认是100,增大会减少RPC次数,提高性能;

2、内置hdfs sink的解析时间戳来设置目录或者文件前缀非常损耗性能,因为是基于正则来匹配的,可以通过修改源码来替换解析时间功能来极大提升性能,稍后我会写一篇文章来专门说明这个问题;

3、RollingFileSink文件名不能自定义,而且不能定时滚动文件,只能按时间间隔滚动,可以自己定义sink,来做定时写文件;

4、hdfs sink的文件名中的时间戳部分不能省去,可增加前缀、后缀以及正在写的文件的前后缀等信息;"hdfs.idleTimeout"这个参数很有意义,指的是正在写的hdfs文件多长时间不更新就关闭文件,建议都配置上,比如你设置了解析时间戳存不同的目录、文件名,而且rollInterval=0、rollCount=0、rollSize=1000000,如果这个时间内的数据量达不到rollSize的要求而且后续的写入新的文件中了,就是一直打开,类似情景不注意的话可能很多;"hdfs.callTimeout"这个参数指的是每个hdfs操作(读、写、打开、关闭等)规定的最长操作时间,每个操作都会放入"hdfs.threadsPoolSize"指定的线程池中得一个线程来操作;

5、关于HBase sink(非异步hbase sink:AsyncHBaseSink),rowkey不能自定义,而且一个serializer只能写一列,一个serializer按正则匹配多个列,性能可能存在问题,建议自己根据需求写一个hbase sink;

6、avro sink可以配置failover和loadbalance,所用的组件和sinkgroup中的是一样的,而且也可以在此配置压缩选项,需要在avro source中配置解压缩;

四、关于SinkGroup:

1、不管是loadbalance或者是failover的多个sink需要共用一个channel;

2、loadbalance的多个sink如果都是直接输出到同一种设备,比如都是hdfs,性能并不会有明显增加,因为sinkgroup是单线程的它的process方法会轮流调用每个sink去channel中take数据,并确保处理正确,使得是顺序操作的,但是如果是发送到下一级的flume agent就不一样了,take操作是顺序的,但是下一级agent的写入操作是并行的,所以肯定是快的;

3、其实用loadbalance在一定意义上可以起到failover的作用,生产环境量大建议loadbalance;

五、关于监控monitor:

1、监控我这边做得还是比较少的,但是目前已知的有以下几种吧:cloudera manager(前提是你得安装CDH版本)、ganglia(这个天生就是支持的)、http(其实就是将统计信息jmx信息,封装成json串,使用jetty展示在浏览器中而已)、再一个就是自己实现收集监控信息,自己做(可以收集http的信息或者自己实现相应的接口实现自己的逻辑,具体可以参考我以前的博客);

2、简单说一下cloudera manager这种监控,最近在使用,确实很强大,可以查看实时的channel进出数据速率、channel实时容量、sink的出速率、source的入速率等等,图形化的东西确实很丰富很直观,可以提供很多flume agent整体运行情况的信息和潜在的一些信息;

六、关于flume启动:

1、flume组件启动顺序:channels——>sinks——>sources,关闭顺序:sources——>sinks——>channels;

2、自动加载配置文件功能,会先关闭所有组件,再重启所有组件;

3、关于AbstractConfigurationProvider中的Map<Class<? extends Channel>, Map<String, Channel>> channelCache这个对象,始终存储着agent中得所有channel对象,因为在动态加载时,channel中可能还有未消费完的数据,但是需要对channel重新配置,所以用以来缓存channel对象的所有数据及配置信息;

4、通过在启动命令中添加 "no-reload-conf"参数为true来取消自动加载配置文件功能;

七、关于interceptor:

八、关于自定义组件:sink、source、channel:

1、channel不建议自定义哦,这个要求比较高,其他俩都是框架式的开发,往指定的方法填充自己配置、启动、关闭、业务逻辑即可,以后有机会单独写一篇文章来介绍;

2、关于自定义组件请相信github,上面好多好多好多,可以直接用的自定义组件....;

九、关于Flume-NG集群网络拓扑方案:

1、在每台采集节点上部署一个flume agent,然后做一到多个汇总flume agent(loadbalance),采集只负责收集数据发往汇总,汇总可以写HDFS、HBase、spark、本地文件、kafka等等,这样一般修改会只在汇总,agent少,维护工作少;

2、采集节点没有部署flume agent,可能发往mongo、redis等,这时你需要自定义source或者使用sdk来将其中的数据取出并发往flume agent,这样agent就又可以充当“采集节点”或者汇总节点了,但是这样在前面相当于加了一层控制,就又多了一层风险;

3、由于能力有限,其它未知,上面两种,第一种好些,这里看看美团的架构———— 传送门

四、

java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null

sink是hdfs,然后使用目录自动生成功能。出现如题的错误,看官网文档说的是需要在每个文件记录行的开头需要有时间戳,但是时间戳的格式可能比较难调节,所以亦可设置hdfs.useLocalTimeStamp这个参数,比如以每个小时作为一个文件夹,那么配置应该是这样:

a1.sinks.k1.hdfs.path = hdfs://ubuntu:9000/flume/events/%y-%m-%d/%H  
a1.sinks.k1.hdfs.filePrefix = events-  
a1.sinks.k1.hdfs.round = true  
a1.sinks.k1.hdfs.roundValue = 1  
a1.sinks.k1.hdfs.roundUnit = hour  
a1.sinks.k1.hdfs.useLocalTimeStamp = true  

或者修改hdfs.timeZone这个参数使之可以和我们上传的log文件的日期格式一样应该就可以了,没有测试过。

五、flume学习(三):flume将log4j日志数据写入到hdfs

本次我们把log4j的日志直接采集输出到hdfs中去。需要修改flume.conf中sink的配置:

1.  tier1.sources=source1  
2.  tier1.channels=channel1  
3.  tier1.sinks=sink1  

5.  tier1.sources.source1.type=avro  
6.  tier1.sources.source1.bind=0.0.0.0  
7.  tier1.sources.source1.port=44444  
8.  tier1.sources.source1.channels=channel1  

10.  tier1.channels.channel1.type=memory  
11.  tier1.channels.channel1.capacity=10000  
12.  tier1.channels.channel1.transactionCapacity=1000  
13.  tier1.channels.channel1.keep-alive=30  

15.  tier1.sinks.sink1.type=hdfs  
16.  tier1.sinks.sink1.channel=channel1  
17.  tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events  
18.  tier1.sinks.sink1.hdfs.fileType=DataStream  
19.  tier1.sinks.sink1.hdfs.writeFormat=Text  
20.  tier1.sinks.sink1.hdfs.rollInterval=0  
21.  tier1.sinks.sink1.hdfs.rollSize=10240  
22.  tier1.sinks.sink1.hdfs.rollCount=0  
23.  tier1.sinks.sink1.hdfs.idleTimeout=60  

六、【Flume】【源码分析】flume中sink到hdfs,文件系统频繁产生文件,文件滚动配置不起作用?

解决方案

原因:flume配置问题或者说代码问题 ,文件滚动的判断条件存在漏洞

增加配置参数 ,即可按照参数滚动文件

flume1.sinks.sink1.hdfs.minBlockReplicas=1

参考:http://blog.csdn.net/simonchi/article/details/43231891

七、查看最终配置

(来源http://www.it610.com/article/2107322.htm

最终配置文件示例
# flume1 which ones we want to activate.
flume1.channels = ch1
flume1.sources = src1
flume1.sinks = sink1
# Define a memory channel called ch1 on flume1
flume1.channels.ch1.type = memory
flume1.channels.ch1.capacity = 100000
flume1.channels.ch1.transactionCapacity = 1000
flume1.channels.ch1.keep-alive = 30
# Define an Avro source called src1 on flume1 and tell it
# to bind to 0.0.0.0:8888\. Connect it to channel ch1.
flume1.sources.src1.channels = ch1
flume1.sources.src1.type = avro
flume1.sources.src1.bind = 0.0.0.0
flume1.sources.src1.port = 8888
flume1.sources.src1.threads = 5
flume1.sinks.sink1.type = hdfs
flume1.sinks.sink1.channel = ch1
flume1.sinks.sink1.hdfs.path =hdfs:``//master:9000/ysg/flume/ysg/%Y%m
flume1.sinks.sink1.hdfs.filePrefix = ysg
flume1.sinks.sink1.hdfs.fileSuffix = .log
flume1.sinks.sink1.hdfs.inUseSuffix = .tmp
flume1.sinks.sink1.hdfs.maxOpenFiles = 5000
flume1.sinks.sink1.hdfs.batchSize= 1
flume1.sinks.sink1.hdfs.fileType = DataStream
flume1.sinks.sink1.hdfs.writeFormat =Text
#flume1.sinks.sink1.hdfs.rollSize =64*1024*1024
flume1.sinks.sink1.hdfs.rollSize = 67108864
flume1.sinks.sink1.hdfs.rollCount = 0
flume1.sinks.sink1.hdfs.rollInterval = 0
flume1.sinks.sink1.hdfs.minBlockReplicas=1
flume1.sinks.sink1.hdfs.useLocalTimeStamp = ``true
flume1.sinks.sink1.hdfs.connect-timeout=80000
flume1.sinks.sink1.hdfs.callTimeout=120000
flume1.sinks.sink1.hdfs.idleTimeout = 60

八、 3.1 基础参数调优经验 --去掉 每写一行在行尾添加一个换行符 情况

  • HdfsSink中默认的serializer会每写一行在行尾添加一个换行符,我们日志本身带有换行符,这样会导致每条日志后面多一个空行,修改配置不要自动添加换行符;
    lc.sinks.sink_hdfs.serializer.appendNewline = ``false
  • 调大MemoryChannel的capacity,尽量利用MemoryChannel快速的处理能力;

  • 调大HdfsSink的batchSize,增加吞吐量,减少hdfs的flush次数;

  • 适当调大HdfsSink的callTimeout,避免不必要的超时错误;

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容

  • 面对以上的问题,我们如何将这些日志移动到hdfs集群上尼???? 第一种方案:使用shell脚本cp 文件,然后通...
    机灵鬼鬼阅读 1,376评论 1 1
  • 博客原文 翻译作品,水平有限,如有错误,烦请留言指正。原文请见 官网英文文档 引言 概述 Apache Flume...
    rabbitGYK阅读 11,437评论 13 34
  • 介绍 概述 Apache Flume是为有效收集聚合和移动大量来自不同源到中心数据存储而设计的可分布,可靠的,可用...
    ximengchj阅读 3,513评论 0 13
  • 这里主要介绍几种常见的日志的source来源,包括监控文件型,监控文件内容增量,TCP和HTTP。 Spool类型...
    欢醉阅读 1,381评论 0 10
  • 订阅 笔记 分层存储 HASH存储 命令 连接 命令说明 配置 常用 修改密码 允许远程访问 说明 安装 Ubun...
    喵王不瞌睡阅读 291评论 0 0