Flume日志收集方案设计和测试

1 背景

随着微服务拆封和部署节点的增长,各服务日志非常分散,发现业务问题时,排查人员需要依次登录不同节点服务器逐一排查日志,现有方式存在查询定位问题耗时长,日志滚动日志丢失,服务器登录安全等问题,收集的业务服务日志量级非常巨大,截止现在每天约10TG日志量为此,我们引入日志平台,将分散的服务日志统一收集存储并提供平台进行查询,并做好方案性能测试与调优方案调研。

2 目标

完成对服务日志统一收集。
日志实时性不超过2小时。
单Agent节点QPS吞吐量>5000。

3 方案设计

3.1 架构设计

image.png

3.1.1 Agent

采集日志节点,部署在应用服务器。

3.1.1.1 Source

选择Taildir Source。


image.png

Windows系统不支持,对于部署在Windows服务程序(网上有魔改的tail source,不建议使用),需要使用Spooling Directory Source,并且保证监控目录不会出现正在写的文件。可通过日志配置或写脚本将日志文件MV到指定目录。

3.1.1.2 Channel

选择File Source。

Agent部署在业务服务服务器,从日志消息不丢失,尽可能不影响业务服务运行情况下决定使用File Source,Flume 1.7版本已存在与美团Dao Channel相似功能,资源限制条件下使用Memory Channel,超过资源限制使用File channel,但是官方建议此功能是实验性功能,当前版本不推荐在生产环境使用。

image.png

3.1.1.3 Sink

3.1.2 Collector Node

收集节点,单独服务器,可扩展。

3.1.2.1 Source

3.1.2.2 Channel

选择File Source。考虑与Agent Channel相同原因。

3.1.2.3 Sink

3.2 稳定性扩展方案

通过Load balancing Sink Processor,对Agent的 Kafka Sink进行负载均衡(负载策略round_robin)稳定性扩展, 由于flume在单线程中轮询,故此方案性能提升不明显, 仅提高file channel与Kfka Sink之间可靠性。

image.png

3.3 备用方案

之前sink kafka,kafka集群可能是瓶颈,此方案作为备选方案。

多个Agent顺序连接:将多个Agent顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中。一般情况下,应该控制这种顺序连接的Agent的数量,因为数据流经的路径变长了,如果不考虑Failover的话,出现故障将影响整个Flow上的Agent收集服务。

image.png

4 性能测试

4.1 服务器列表

4.1.1 flume agent

10.66.221.98 4core 8G

4.1.2 zk+kafka

10.66.221.108 4core 4G

10.66.221.109 4core 4G

10.66.221.110 4core 4G

4.2 用例信息

选择生产环境10.66.8.31服务器CP日志用于测试(早上6点40左右高峰期日志)。


文件名:Aquila_DATA_201805220640_010368_043.log

日志来源服务器IP:10.66.8.31

服务器配置:4core + 4G

文件大小:~100M

总条数:162034 条   总时长: 37.5 秒

单条平均大小:100*1024*1024 / 162034 = ~647 byte

单个cp带宽: 100M / 37.5 = ~2.67M/s

单个cp每小时文件大小:~9.6G

单个cp每天文件大小:~230G

qps:162034 / 37.5 = ~4321 条

qpm:qps * 60 = ~259260条

qph:qpm * 60 = ~15555600条

以目前30个cp节点估算:

30qph= qph * 30 =~466668000条 (30个cp节点)

30个cp节点文件大小每小时=30qph *单条平均大小/ (1024 *1024*1024) = ~280G

4.2.1 flume测试配置

JVM 参数

JAVA_OPTS="-Xms1024m -Xmx1024m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=20000 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false"

4.3 测试方案

主要针对flumeBatchSize参数调优。Flume启动最小

每组测试3次,取平均值。


sink kafka:

flumeBatchSize = 1000       flumeBatchSize = 10000

25s                                                    20s

flume: 1G JVM内存

总文件大小:~1G

总条数:1637590 

总时长:254 s

JVM MEM:~300M

TPS:1637590 / 254 = ~6447

4.4 测试结论

Flume Agent:Flume 1.7
Collector Node:CDH Flume 1.6

4.4.1 Flume Aget测试

文件名:Aquila_DATA_201805220640_010368_043.log</pre>
文件大小:~100M</pre>
总条数:162034条 </pre>

4.4.1.1 Source -> Channel

通过观察和统计metrics,发现接收162034 条消息约3秒,FileChannel并没有成为瓶颈。Channel -> Sink才是瓶颈关键。

4.4.1.2 Channel -> Sink

以下进行多组测试,取平均值。

file_roll sink:5s
avro sink:20s ChannelFillPercentage值范围:(0,1)</pre>
kafka sink:100s (配置已做优化,研发环境kafka集群配置低,具体线上观察调节kakfa JVM内存等条件)

image.png

4.4.1.3 Flume JVM

文件大小:~100M</pre>
总条数:162034 条</pre>
CPU:< 4%</pre>
MEM:<300M</pre>

image.png

将测试文件并发增大10倍。</pre>
文件大小:~100M * 10</pre>
总条数:162034条 * 10</pre>
CPU:< 10%</pre>
MEM:<400M</pre>

image.png

故目前生产环境Flume JVM分配1G远远满足。

image.png

4.4.2 Flume Collector Node 测试

在CDH创建一个Flume集群部署一个Agent 成为Collector Node。

4.4.2.1 Source -> Channel

kafka topic 分区=5

通过观察和统计metrics, channel 和sink 都能及时处理并实时落地HDFS, 此时Sink HDFS 稍微成为瓶颈, 后续可通过增加Cllector Node 方式增加并行处理能力。

image.png

此时Flume Cllector Node 内存~1G。

image.png

4.4.2.2 hdfs性能

image.png

4.5 测试总结

当前flume架构配置设计方案:

cp生产日志QPS为6447条/s。

6447 大于 4321,满足需求, 更多性能优化只能在生产环境实际调优。

5 消息格式验证

消息格式验证在Agent进行验证,不通过验证的消息直接丢弃,避免不合约定的消息进入kafka,减轻kafka集群压力。

6 监控方案(待定)

Flume包含以下几种监控方案, JMX Reporting,Ganglia Reporting不能同时配置。

Reporting,JSON Reporting,Ganglia Reporting, Custom Reporting。

也可向Zabbix上报监控数据。

Cllector Node 在CDH创建,可通过CDH监控。

6.1 JMX Reporting

JMX高爆可以在flume-env.sh文件修改JAVA_OPTS环境变量,可通过jvisualvm监控。

6.2 JSON Reporting

Flume可以通过JSON形式报告metrics,启用JSON形式,Flume需要配置一个端口。

6.3 Ganglia Reporting

Flume也可以报告metrics到Ganglia 3或者是Ganglia 3.1的metanodes。要将metrics报告到Ganglia,必须在启动的时候就支持Flume Agent。

6.4 Custom Reporting

自定义的监控需要实现org.apache.flume.instrumentation.MonitorService接口。例如有一个HTTP的监控类叫HttpReporting,我可以通过如下方式启动这个监控。

7 注意事项

当前CDH版flume为1.6版本,参数配置参考CDH Flume配置文档。https://archive.cloudera.com/cdh5/cdh/5/flume-ng/FlumeUserGuide.html#hdfs-sink

8 附录

8.1 FLume Agent配置

agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1

agent1.sources.r1.type = TAILDIR
agent1.sources.r1.positionFile = /flume/agent2/taildir_position.json
agent1.sources.r1.filegroups = f1
agent1.sources.r1.filegroups.f1 = /root/testlog1/Aquila_DATA_.*.log
agent1.sources.r1.batchSize = 1000
agent1.sources.r1.backoffSleepIncrement = 5000
agent1.sources.r1.maxBackoffSleep = 5000
agent1.sources.r1.channels = c1

#agent1.sources.r1.type = avro
#agent1.sources.r1.bind = 10.66.221.138
#agent1.sources.r1.port = 44444
#agent1.sources.r1.compression-type = deflate
#agent1.sources.r1.channels = c1

#agent1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource
#agent1.sources.r1.kafka.bootstrap.servers=10.66.221.108:9092,10.66.221.109:9092,10.66.221.110:9092
#agent1.sources.r1.kafka.topics=cp-aquila-data
#agent1.sources.r1.kafka.consumer.group.id=flume_cp-aquila-data

#agent1.sources.r1.batchSize = 10000
#agent1.sources.r1.batchDurationMillis = 2000
#agent1.sources.r1.channels=c1
#agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#agent1.sinks.k1.kafka.topic = cp-aquila-data
#agent1.sinks.k1.kafka.bootstrap.servers = 10.66.221.108:9092,10.66.221.109:9092,10.66.221.110:9092
#agent1.sinks.k1.kafka.flumeBatchSize = 5000
#agent1.sinks.k1.kafka.producer.acks = 1
#agent1.sinks.k1.kafka.producer.linger.ms = 1
#agent1.sinks.k1.kafka.producer.max.request.size =10485760
#agent1.sinks.k1.channel = c1
#agent1.sinks.k1.type = file_roll

#agent1.sinks.k1.sink.directory = /root/flumefiles
#agent1.sinks.k1.sink.rollInterval = 0
#agent1.sinks.k1.channel = c1
#agent1.sinks.k1.hdfs.path=hdfs://v2-cdh03:8020/warehouse/applog/aquila/%Y%m%d
#agent1.sinks.k1.hdfs.filePrefix=applog_%Y%m%d_
#agent1.sinks.k1.hdfs.inUsePrefix=_
#agent1.sinks.k1.hdfs.rollSize=280000800
#agent1.sinks.k1.hdfs.rollInterval = 0
#agent1.sinks.k1.hdfs.rollCount=0
#agent1.sinks.k1.hdfs.round = true
#agent1.sinks.k1.hdfs.roundValue = 1
#agent1.sinks.k1.hdfs.roundUnit = hour
#agent1.sinks.k1.hdfs.proxyUser=flume
#agent1.sinks.k1.hdfs.fileType=DataStream
#agent1.sinks.k1.hdfs.batchSize=10000
#agent1.sinks.k1.hdfs.callTimeout=60000
#agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = 10.66.221.138
agent1.sinks.k1.port = 44444
agent1.sinks.k1.connect-timeout = 200000
agent1.sinks.k1.compression-type = deflate
agent1.sinks.k1.channel = c1
agent1.channels.c1.type = file
agent1.channels.c1.checkpointDir = /flume/agent2/checkpoint
agent1.channels.c1.dataDirs = /flume/agent2/data
agent1.channels.c1.capacity = 10000000
agent1.channels.c1.transactionCapacity = 5000
#agent1.channels.c1.type=memory
#agent1.channels.c1.capacity =10000000
#agent1.channels.c1.transactionCapacity =5000
#agent1.channels.c1.keep-alive=30
#agent1.channels.c1.byteCapacityBufferPercentage=40
#agent1.channels.c1.byteCapacity=536870912

8.2 Flume metris

  {

  "SOURCE.src-1":{

    "OpenConnectionCount":"0",    //目前与客户端或sink保持连接的总数量(目前只有avro source展现该度量)

    "Type":"SOURCE",          

    "AppendBatchAcceptedCount":"1355",  //成功提交到channel的批次的总数量

    "AppendBatchReceivedCount":"1355",  //接收到事件批次的总数量

    "EventAcceptedCount":"28286", //成功写出到channel的事件总数量,且source返回success给创建事件的sink或RPC客户端系统

    "AppendReceivedCount":"0",    //每批只有一个事件的事件总数量(与RPC调用中的一个append调用相等)

    "StopTime":"0",     //source停止时自Epoch以来的毫秒值时间

    "StartTime":"1442566410435",  //source启动时自Epoch以来的毫秒值时间

    "EventReceivedCount":"28286", //目前为止source已经接收到的事件总数量

    "AppendAcceptedCount":"0"   //单独传入的事件到Channel且成功返回的事件总数量

  },

  "CHANNEL.ch-1":{

    "EventPutSuccessCount":"28286", //成功写入channel且提交的事件总数量

    "ChannelFillPercentage":"0.0",  //channel满时的百分比

    "Type":"CHANNEL",

    "StopTime":"0",     //channel停止时自Epoch以来的毫秒值时间

    "EventPutAttemptCount":"28286", //Source尝试写入Channe的事件总数量

    "ChannelSize":"0",      //目前channel中事件的总数量

    "StartTime":"1442566410326",  //channel启动时自Epoch以来的毫秒值时间

    "EventTakeSuccessCount":"28286",  //sink成功读取的事件的总数量

    "ChannelCapacity":"1000000", //channel的容量

    "EventTakeAttemptCount":"313734329512" //sink尝试从channel拉取事件的总数量。这不意味着每次事件都被返回,因为sink拉取的时候channel可能没有任何数据

  },

  "SINK.sink-1":{

    "Type":"SINK",

    "ConnectionClosedCount":"0",  //下一阶段或存储系统关闭的连接数量(如在HDFS中关闭一个文件)

    "EventDrainSuccessCount":"28286", //sink成功写出到存储的事件总数量

    "KafkaEventSendTimer":"482493",   

    "BatchCompleteCount":"0",   //与最大批量尺寸相等的批量的数量

    "ConnectionFailedCount":"0",  //下一阶段或存储系统由于错误关闭的连接数量(如HDFS上一个新创建的文件因为超时而关闭)

    "EventDrainAttemptCount":"0", //sink尝试写出到存储的事件总数量

    "ConnectionCreatedCount":"0", //下一个阶段或存储系统创建的连接数量(如HDFS创建一个新文件)

    "BatchEmptyCount":"0",    //空的批量的数量,如果数量很大表示souce写数据比sink清理数据慢速度慢很多

    "StopTime":"0",     

    "RollbackCount":"9",      //

    "StartTime":"1442566411897",

    "BatchUnderflowCount":"0"   //比sink配置使用的最大批量尺寸更小的批量的数量,如果该值很高也表示sink比souce更快

  }

  }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,463评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,868评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,213评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,666评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,759评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,725评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,716评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,484评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,928评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,233评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,393评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,073评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,718评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,308评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,538评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,338评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,260评论 2 352

推荐阅读更多精彩内容