1 背景
随着微服务拆封和部署节点的增长,各服务日志非常分散,发现业务问题时,排查人员需要依次登录不同节点服务器逐一排查日志,现有方式存在查询定位问题耗时长,日志滚动日志丢失,服务器登录安全等问题,收集的业务服务日志量级非常巨大,截止现在每天约10TG日志量为此,我们引入日志平台,将分散的服务日志统一收集存储并提供平台进行查询,并做好方案性能测试与调优方案调研。
2 目标
完成对服务日志统一收集。
日志实时性不超过2小时。
单Agent节点QPS吞吐量>5000。
3 方案设计
3.1 架构设计
3.1.1 Agent
采集日志节点,部署在应用服务器。
3.1.1.1 Source
选择Taildir Source。
Windows系统不支持,对于部署在Windows服务程序(网上有魔改的tail source,不建议使用),需要使用Spooling Directory Source,并且保证监控目录不会出现正在写的文件。可通过日志配置或写脚本将日志文件MV到指定目录。
3.1.1.2 Channel
选择File Source。
Agent部署在业务服务服务器,从日志消息不丢失,尽可能不影响业务服务运行情况下决定使用File Source,Flume 1.7版本已存在与美团Dao Channel相似功能,资源限制条件下使用Memory Channel,超过资源限制使用File channel,但是官方建议此功能是实验性功能,当前版本不推荐在生产环境使用。
3.1.1.3 Sink
3.1.2 Collector Node
收集节点,单独服务器,可扩展。
3.1.2.1 Source
3.1.2.2 Channel
选择File Source。考虑与Agent Channel相同原因。
3.1.2.3 Sink
3.2 稳定性扩展方案
通过Load balancing Sink Processor,对Agent的 Kafka Sink进行负载均衡(负载策略round_robin)稳定性扩展, 由于flume在单线程中轮询,故此方案性能提升不明显, 仅提高file channel与Kfka Sink之间可靠性。
3.3 备用方案
之前sink kafka,kafka集群可能是瓶颈,此方案作为备选方案。
多个Agent顺序连接:将多个Agent顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中。一般情况下,应该控制这种顺序连接的Agent的数量,因为数据流经的路径变长了,如果不考虑Failover的话,出现故障将影响整个Flow上的Agent收集服务。
4 性能测试
4.1 服务器列表
4.1.1 flume agent
10.66.221.98 4core 8G
4.1.2 zk+kafka
10.66.221.108 4core 4G
10.66.221.109 4core 4G
10.66.221.110 4core 4G
4.2 用例信息
选择生产环境10.66.8.31服务器CP日志用于测试(早上6点40左右高峰期日志)。
文件名:Aquila_DATA_201805220640_010368_043.log
日志来源服务器IP:10.66.8.31
服务器配置:4core + 4G
文件大小:~100M
总条数:162034 条 总时长: 37.5 秒
单条平均大小:100*1024*1024 / 162034 = ~647 byte
单个cp带宽: 100M / 37.5 = ~2.67M/s
单个cp每小时文件大小:~9.6G
单个cp每天文件大小:~230G
qps:162034 / 37.5 = ~4321 条
qpm:qps * 60 = ~259260条
qph:qpm * 60 = ~15555600条
以目前30个cp节点估算:
30qph= qph * 30 =~466668000条 (30个cp节点)
30个cp节点文件大小每小时=30qph *单条平均大小/ (1024 *1024*1024) = ~280G
4.2.1 flume测试配置
JVM 参数
JAVA_OPTS="-Xms1024m -Xmx1024m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=20000 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false"
4.3 测试方案
主要针对flumeBatchSize参数调优。Flume启动最小
每组测试3次,取平均值。
sink kafka:
flumeBatchSize = 1000 flumeBatchSize = 10000
25s 20s
flume: 1G JVM内存
总文件大小:~1G
总条数:1637590
总时长:254 s
JVM MEM:~300M
TPS:1637590 / 254 = ~6447
4.4 测试结论
Flume Agent:Flume 1.7
Collector Node:CDH Flume 1.6
4.4.1 Flume Aget测试
文件名:Aquila_DATA_201805220640_010368_043.log</pre>
文件大小:~100M</pre>
总条数:162034条 </pre>
4.4.1.1 Source -> Channel
通过观察和统计metrics,发现接收162034 条消息约3秒,FileChannel并没有成为瓶颈。Channel -> Sink才是瓶颈关键。
4.4.1.2 Channel -> Sink
以下进行多组测试,取平均值。
file_roll sink:5s
avro sink:20s ChannelFillPercentage值范围:(0,1)</pre>
kafka sink:100s (配置已做优化,研发环境kafka集群配置低,具体线上观察调节kakfa JVM内存等条件)
4.4.1.3 Flume JVM
文件大小:~100M</pre>
总条数:162034 条</pre>
CPU:< 4%</pre>
MEM:<300M</pre>
将测试文件并发增大10倍。</pre>
文件大小:~100M * 10</pre>
总条数:162034条 * 10</pre>
CPU:< 10%</pre>
MEM:<400M</pre>
故目前生产环境Flume JVM分配1G远远满足。
4.4.2 Flume Collector Node 测试
在CDH创建一个Flume集群部署一个Agent 成为Collector Node。
4.4.2.1 Source -> Channel
kafka topic 分区=5
通过观察和统计metrics, channel 和sink 都能及时处理并实时落地HDFS, 此时Sink HDFS 稍微成为瓶颈, 后续可通过增加Cllector Node 方式增加并行处理能力。
此时Flume Cllector Node 内存~1G。
4.4.2.2 hdfs性能
4.5 测试总结
当前flume架构配置设计方案:
cp生产日志QPS为6447条/s。
6447 大于 4321,满足需求, 更多性能优化只能在生产环境实际调优。
5 消息格式验证
消息格式验证在Agent进行验证,不通过验证的消息直接丢弃,避免不合约定的消息进入kafka,减轻kafka集群压力。
6 监控方案(待定)
Flume包含以下几种监控方案, JMX Reporting,Ganglia Reporting不能同时配置。
Reporting,JSON Reporting,Ganglia Reporting, Custom Reporting。
也可向Zabbix上报监控数据。
Cllector Node 在CDH创建,可通过CDH监控。
6.1 JMX Reporting
JMX高爆可以在flume-env.sh文件修改JAVA_OPTS环境变量,可通过jvisualvm监控。
6.2 JSON Reporting
Flume可以通过JSON形式报告metrics,启用JSON形式,Flume需要配置一个端口。
6.3 Ganglia Reporting
Flume也可以报告metrics到Ganglia 3或者是Ganglia 3.1的metanodes。要将metrics报告到Ganglia,必须在启动的时候就支持Flume Agent。
6.4 Custom Reporting
自定义的监控需要实现org.apache.flume.instrumentation.MonitorService接口。例如有一个HTTP的监控类叫HttpReporting,我可以通过如下方式启动这个监控。
7 注意事项
当前CDH版flume为1.6版本,参数配置参考CDH Flume配置文档。https://archive.cloudera.com/cdh5/cdh/5/flume-ng/FlumeUserGuide.html#hdfs-sink
8 附录
8.1 FLume Agent配置
agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1
agent1.sources.r1.type = TAILDIR
agent1.sources.r1.positionFile = /flume/agent2/taildir_position.json
agent1.sources.r1.filegroups = f1
agent1.sources.r1.filegroups.f1 = /root/testlog1/Aquila_DATA_.*.log
agent1.sources.r1.batchSize = 1000
agent1.sources.r1.backoffSleepIncrement = 5000
agent1.sources.r1.maxBackoffSleep = 5000
agent1.sources.r1.channels = c1
#agent1.sources.r1.type = avro
#agent1.sources.r1.bind = 10.66.221.138
#agent1.sources.r1.port = 44444
#agent1.sources.r1.compression-type = deflate
#agent1.sources.r1.channels = c1
#agent1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource
#agent1.sources.r1.kafka.bootstrap.servers=10.66.221.108:9092,10.66.221.109:9092,10.66.221.110:9092
#agent1.sources.r1.kafka.topics=cp-aquila-data
#agent1.sources.r1.kafka.consumer.group.id=flume_cp-aquila-data
#agent1.sources.r1.batchSize = 10000
#agent1.sources.r1.batchDurationMillis = 2000
#agent1.sources.r1.channels=c1
#agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#agent1.sinks.k1.kafka.topic = cp-aquila-data
#agent1.sinks.k1.kafka.bootstrap.servers = 10.66.221.108:9092,10.66.221.109:9092,10.66.221.110:9092
#agent1.sinks.k1.kafka.flumeBatchSize = 5000
#agent1.sinks.k1.kafka.producer.acks = 1
#agent1.sinks.k1.kafka.producer.linger.ms = 1
#agent1.sinks.k1.kafka.producer.max.request.size =10485760
#agent1.sinks.k1.channel = c1
#agent1.sinks.k1.type = file_roll
#agent1.sinks.k1.sink.directory = /root/flumefiles
#agent1.sinks.k1.sink.rollInterval = 0
#agent1.sinks.k1.channel = c1
#agent1.sinks.k1.hdfs.path=hdfs://v2-cdh03:8020/warehouse/applog/aquila/%Y%m%d
#agent1.sinks.k1.hdfs.filePrefix=applog_%Y%m%d_
#agent1.sinks.k1.hdfs.inUsePrefix=_
#agent1.sinks.k1.hdfs.rollSize=280000800
#agent1.sinks.k1.hdfs.rollInterval = 0
#agent1.sinks.k1.hdfs.rollCount=0
#agent1.sinks.k1.hdfs.round = true
#agent1.sinks.k1.hdfs.roundValue = 1
#agent1.sinks.k1.hdfs.roundUnit = hour
#agent1.sinks.k1.hdfs.proxyUser=flume
#agent1.sinks.k1.hdfs.fileType=DataStream
#agent1.sinks.k1.hdfs.batchSize=10000
#agent1.sinks.k1.hdfs.callTimeout=60000
#agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = 10.66.221.138
agent1.sinks.k1.port = 44444
agent1.sinks.k1.connect-timeout = 200000
agent1.sinks.k1.compression-type = deflate
agent1.sinks.k1.channel = c1
agent1.channels.c1.type = file
agent1.channels.c1.checkpointDir = /flume/agent2/checkpoint
agent1.channels.c1.dataDirs = /flume/agent2/data
agent1.channels.c1.capacity = 10000000
agent1.channels.c1.transactionCapacity = 5000
#agent1.channels.c1.type=memory
#agent1.channels.c1.capacity =10000000
#agent1.channels.c1.transactionCapacity =5000
#agent1.channels.c1.keep-alive=30
#agent1.channels.c1.byteCapacityBufferPercentage=40
#agent1.channels.c1.byteCapacity=536870912
8.2 Flume metris
{
"SOURCE.src-1":{
"OpenConnectionCount":"0", //目前与客户端或sink保持连接的总数量(目前只有avro source展现该度量)
"Type":"SOURCE",
"AppendBatchAcceptedCount":"1355", //成功提交到channel的批次的总数量
"AppendBatchReceivedCount":"1355", //接收到事件批次的总数量
"EventAcceptedCount":"28286", //成功写出到channel的事件总数量,且source返回success给创建事件的sink或RPC客户端系统
"AppendReceivedCount":"0", //每批只有一个事件的事件总数量(与RPC调用中的一个append调用相等)
"StopTime":"0", //source停止时自Epoch以来的毫秒值时间
"StartTime":"1442566410435", //source启动时自Epoch以来的毫秒值时间
"EventReceivedCount":"28286", //目前为止source已经接收到的事件总数量
"AppendAcceptedCount":"0" //单独传入的事件到Channel且成功返回的事件总数量
},
"CHANNEL.ch-1":{
"EventPutSuccessCount":"28286", //成功写入channel且提交的事件总数量
"ChannelFillPercentage":"0.0", //channel满时的百分比
"Type":"CHANNEL",
"StopTime":"0", //channel停止时自Epoch以来的毫秒值时间
"EventPutAttemptCount":"28286", //Source尝试写入Channe的事件总数量
"ChannelSize":"0", //目前channel中事件的总数量
"StartTime":"1442566410326", //channel启动时自Epoch以来的毫秒值时间
"EventTakeSuccessCount":"28286", //sink成功读取的事件的总数量
"ChannelCapacity":"1000000", //channel的容量
"EventTakeAttemptCount":"313734329512" //sink尝试从channel拉取事件的总数量。这不意味着每次事件都被返回,因为sink拉取的时候channel可能没有任何数据
},
"SINK.sink-1":{
"Type":"SINK",
"ConnectionClosedCount":"0", //下一阶段或存储系统关闭的连接数量(如在HDFS中关闭一个文件)
"EventDrainSuccessCount":"28286", //sink成功写出到存储的事件总数量
"KafkaEventSendTimer":"482493",
"BatchCompleteCount":"0", //与最大批量尺寸相等的批量的数量
"ConnectionFailedCount":"0", //下一阶段或存储系统由于错误关闭的连接数量(如HDFS上一个新创建的文件因为超时而关闭)
"EventDrainAttemptCount":"0", //sink尝试写出到存储的事件总数量
"ConnectionCreatedCount":"0", //下一个阶段或存储系统创建的连接数量(如HDFS创建一个新文件)
"BatchEmptyCount":"0", //空的批量的数量,如果数量很大表示souce写数据比sink清理数据慢速度慢很多
"StopTime":"0",
"RollbackCount":"9", //
"StartTime":"1442566411897",
"BatchUnderflowCount":"0" //比sink配置使用的最大批量尺寸更小的批量的数量,如果该值很高也表示sink比souce更快
}
}