kafka压力测试
1.生产者压力测试
[shsxt@hadoop002 kafka]$ bin/kafka-producer-perf-test.sh --topic test --record-size 100 --num-records 100000 --throughput -1 --producer-props bootstrap.servers=hadoop002:9092,hadoop003:9092,hadoop004:9092
100000 records sent, 31486.146096 records/sec (3.00 MB/sec), 1374.63 ms avg latency, 1699.00 ms max latency, 1469 ms 50th, 1666 ms 95th, 1694 ms 99th, 1698 ms 99.9th.
2.消费者压力测试
[shsxt@hadoop002 kafka]$ bin/kafka-consumer-perf-test.sh --zookeeper hadoop002:2181 --topic test --fetch-size 10000 --messages 10000000 --threads 1
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
2020-10-20 13:41:44:731, 2020-10-20 13:41:51:215, 9.5367, 1.4708, 100000, 15422.5787
PS:正常的集群应该达到100M/sec以上
kafka机器数量计算
Kafka机器数量(经验公式)=2(峰值生产速度副本数/100)+1
先拿到峰值生产速度,再根据设定的副本数,就能预估出需要部署Kafka的数量。
比如我们的峰值生产速度是50M/s。副本数为2。
Kafka机器数量=2(502/100)+ 1=3台(20台以内的服务器,速率在50M/s以下的,配置3台kafka即可)
这是一个不断迭代的过程,配一下、测一下,最终确定合适的台数。
HDFS参数调优hdfs-site.xml
dfs.namenode.handler.count=20 * log2(Cluster Size),比如集群规模为8台时,此参数设置为60(集群为6台时,参数可设置为50左右)
The number of Namenode RPC server threads that listen to requests from clients. If dfs.namenode.servicerpc-address is not configured then Namenode RPC server threads listen to requests from all nodes.
NameNode有一个工作线程池,用来处理不同DataNode的并发心跳以及客户端并发的元数据操作。对于大集群或者有大量客户端的集群来说,通常需要增大参数dfs.namenode.handler.count的默认值10。设置该值的一般原则是将其设置为集群大小的自然对数乘以20,即20logN,N为集群大小。
YARN参数调优yarn-site.xml
(1)情景描述:总共7台机器,每天几亿条数据,数据源->Flume->Kafka->HDFS->Hive
面临问题:数据统计主要用HiveSQL,没有数据倾斜,小文件已经做了合并处理,开启的JVM重用,而且IO没有阻塞,内存用了不到50%。但是还是跑的非常慢,而且数据量洪峰过来时,整个集群都会宕掉。基于这种情况有没有优化方案。
(2)解决办法:
内存利用率不够。这个一般是Yarn的2个配置造成的,单个任务可以申请的最大内存大小,和Hadoop单个节点可用内存大小。调节这两个参数能提高系统内存的利用率。
(a)yarn.nodemanager.resource.memory-mb
表示该节点上YARN可使用的物理内存总量,默认是8192(MB),注意,如果你的节点内存资源不够8GB,则需要调减小这个值,而YARN不会智能的探测节点的物理内存总量。
(b)yarn.scheduler.maximum-allocation-mb
单个任务可申请的最多物理内存量,默认是8192(MB)。
NodeManager 8G =>要根据服务器实际内存配置:
单个任务默认内存也是8G(WordCount操作)
任务中有很多的MapTask和ReduceTask,假如数据是128M,则默认MT和RT为1G内存
如果是1G的数据量,MT内存设为8G(相应*8)+(RT=2G),一共是10G
(看map的聚合压缩情况,调整RT的内存大小)
Hadoop宕机
(1)如果MR造成系统宕机。此时要控制Yarn同时运行的任务数,和每个任务申请的最大内存。调整参数:yarn.scheduler.maximum-allocation-mb(单个任务可申请的最多物理内存量,默认是8192MB)
(2)如果写入文件过量造成NameNode宕机。那么调高Kafka的存储大小,控制从Kafka到HDFS的写入速度。高峰期的时候用Kafka进行缓存,高峰期过去数据同步会自动跟上。
以及脚本的写入和执行
说明1:
java -classpath 需要在jar包后面指定全类名;
java -jar 需要查看一下解压的jar包META-INF/ MANIFEST.MF文件中,Main-Class是否有全类名。如果有可以用java -jar,如果没有就需要用到java -classpath
说明2:/dev/null代表linux的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称“黑洞”。(可以将一些无用的日志丢入进去)
flume集成kafka,设置上拦截器
1.实现接口
2.重写4个方法(初始化、单event、双event、close关闭资源)
event能获取到 body 和 header
ETL => body => 主要判断的就是 json数据 是否以 { 开头、以 } 结尾,还有服务器时间(长度13,全部是数字)
分类型拦截器 => body header => 根据body区分start或event类型,再把相应数据添加到头里面 topic,start_topic/event_topic
3.静态内部类Builder
new 对象
4.打包上传集群
PS:kafka的blockId必须不能重复
如果想要生成其他时间点的数据,一定要先停止全部集群,再去修改时间,否则会出现flume消费不到kafka数据的情况
(如果一定要造测试数据,就往以后的年月日去改,不要改成以前的年月日)
同步时间脚本
.# !/bin/bash
.# -t与sudo配套使用,虚拟终端的意思,下面是集群同步时间,非正式使用,一般是使用时间服务器
for i in hadoop002 hadoop003 hadoop004
do
echo "========== i "sudo date -s $1"
done
远程控制脚本
.# !/bin/bash
.# 远程登录,执行传入的所有命令
for i in hadoop002 hadoop003 hadoop004
do
echo "========== i "$*"
done
查看全部节点的进程
xcall.sh jps
超链接复制文件脚本
.#!/bin/bash
.#1 获取输入参数个数,如果没有参数,直接退出
pcount=$#
if ((pcount==0)); then
echo no args;
exit;
fi
.#2 获取文件名称
p1=p1`
echo fname=$fname
.#3 获取上级目录到绝对路径
pdir=cd -P $(dirname $p1); pwd
echo pdir=$pdir
.#4 获取当前用户名称
user=whoami
.#5 循环
for host in hadoop002 hadoop003 hadoop004
do
echo ------------------- pdir/user@pdir
done
xsync
采集通道启动停止脚本
.#!/bin/bash
case $1 in
"start"){
echo "----------启动集群-----------"
/opt/module/hadoop-2.7.2/sbin/start-dfs.sh
ssh hadoop003 /opt/module/hadoop-2.7.2/sbin/start-yarn.sh
zk.sh start
sleep 5s;
f1.sh start
kf.sh start
sleep 8s;
f2.sh start
};;
"stop"){
echo "----------停止集群-----------"
f2.sh stop
sleep 5s;
kf.sh stop
sleep 10s;
f1.sh stop
zk.sh stop
ssh hadoop003 /opt/module/hadoop-2.7.2/sbin/stop-yarn.sh
/opt/module/hadoop-2.7.2/sbin/stop-dfs.sh
};;
esac
Hadoop相关总结
1)Hadoop默认不支持LZO压缩,如果需要支持LZO压缩,需要添加jar包,并在hadoop的cores-site.xml文件中添加相关压缩配置。需要掌握让LZO文件支持切片。
2)Hadoop常用端口号,50070,8088,19888,9000
3)Hadoop配置文件以及简单的Hadoop集群搭建。8个配置文件
4)HDFS读流程和写流程
5)MapReduce的Shuffle过程及Hadoop优化(包括:压缩、小文件、集群优化)
6)Yarn的Job提交流程
7)Yarn的默认调度器、调度器分类、以及他们之间的区别
8)HDFS存储多目录
9)Hadoop参数调优
10)项目经验之基准测试
Zookeeper相关总结
1)选举机制
半数机制,安装奇数台
10台服务器几台:3 台
20台服务器几台:5台
100台服务器几台:11台
不是越多越好,也不是越少越好。 如果多,通信时间长,效率低;如果太少,可靠性差。
2)常用命令
ls、get、create
Flume相关总结
1)Flume组成,Put事务,Take事务
Taildir Source:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。
File Channel:数据存储在磁盘,宕机数据可以保存。但是传输速率慢。适合对数据传输可靠性要求高的场景,比如,金融行业。
Memory Channel:数据存储在内存中,宕机数据丢失。传输速率快。适合对数据传输可靠性要求不高的场景,比如,普通的日志数据。
Kafka Channel:减少了Flume的Sink阶段,提高了传输效率。
Source到Channel是Put事务
Channel到Sink是Take事务
2)Flume拦截器
(1)拦截器注意事项
项目中自定义了:ETL拦截器和区分类型拦截器。
采用两个拦截器的优缺点:优点,模块化开发和可移植性;缺点,性能会低一些
(2)自定义拦截器步骤
a)实现 Interceptor
b)重写四个方法
initialize 初始化
public Event intercept(Event event) 处理单个Event
public List<Event> intercept(List<Event> events) 处理多个Event,在这个方法中调用Event intercept(Event event)
close 方法
c)静态内部类,实现Interceptor.Builder