spark调优{
1,资源调优{
a)搭建集群{
(1)配置目录在spark安装包中的conf下spark-env.sh
(2)spark_worker_cores
(3)spark_worker_memory
(4)spark_worker_instances 每台机器启动的worker数
}
b)提交任务{
--executor-cores
--executor-memory
--driver-cores
--driver-memory
--total-executor-cores
注意:
一个cores一般运行2-3个task,每个task一般可以处理1G数据
}
}
2,并行度调优{
i) sc.texFile(xxx,minumpartition) 本地读取一个文件,不写分区,默认是一个分区,按照32M来分partition,大于32M,则切分多个partition
ii) sc.parallelize(xxx,numpartition)
iii) sc.parallelizePairs(xxx,numpartitions)
iiii) sc.makeRDD(xxx,numpartitions)
iiiii) spark.default.parallelism 默认并行度
注意:
(1)集群中一个partition最多128M,分区数跟block的个数一样
(2)以上都用于测试常用
--以上不写第二个参数,默认的分区数需要看下
1,join(xxx,numpartition)与父RDD分区数多的一致,reduceByKey(xx,numpartition)不跟分区数与父RDD一致,groupByKey(xx,numpartition)
2,repartition(numpartition)/coalesce(numpartition,true)
3,自定义分区器,String取hash可能会有负数
4,读取kafka中的数据
i)Receiver spark.streaming.blockInterval=200
ii)direct 提高读取的topic的分区的个数
5,修改sparkSQL的spark.sql.shuffle.partitions--200的配置
}
3,代码调优{
(1)尽量避免重复创建RDD,尽量复用RDD
(2)对多次使用的RDD持久化
i.cache
ii.persist
iii.checkpoint
(3)尽量避免使用shuffle类算子,join=filter/MAP/FLATMAP+广播变量,当俩个RDD进行join时,可以将小的RDD回收到driver端,使用广播变量
(4)尽量使用map有预聚合的算子,reduceByKey ,groupByKey,combineByKey,aggregateByKey
好处:
i)减少shuffle write数据量
ii)减少shuffle read数据量
iii)减少reduce端聚合次数
(5)使用高性能的算子
i)使用reduceByKey 代替groupByKey
ii)使用mapPartition替代map
iii)使用foreachPartition替代foreach
iiii)使用filter后使用coalesce减少分区
iiiii)使用repartitionAndSortWithinPartitions替代repartition与sort类操作
(6)如果executor使用到了driver端的变量,可以使用广播变量
(7) 序列化性能高(kryo)conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").registerKryoClasses(new Class[]{SpeedSortKey.class})
(8)优化数据结构尽量使用字符串替代对象,使用原始类型(比如Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用,从而降低GC频率,提升性能。
(9)使用fastutil库
}
4,数据本地化{
process_local
task处理的数据就在本进程所在的executor的内存中
node_local
task处理的数据就在本进程所在的节点磁盘中,或者处理的数据在本进程所在节点的其他executor内存中
no_perf
读取的数据在关系型数据库中
rack_local
task处理数据在同机架的其他节点磁盘上或者同机架其他节点的executor内存中
any
出机架
调优{
driver端发送task时,首先按照数据本地化,process_local发送task,默认每个task等待3s,重试,重试5次之后,如果还没有被执行,driver端会降级发送到node_local,同理,每个task等待3s重试5次之后,再将级别
•spark.locality.wait 3s 以下所有参数的默认值
• spark.locality.wait.process 3s
• spark.locality.wait.node 3s
• spark.locality.wait.rack 3s
调节task执行不能本末倒置,不能将所有的task都以process_local运行
}
}
5.内存调优{
原因:executor内存不足,导致GC,频繁的GC运行慢
1.调节executor中task的占用内存
i)减少shuffle聚合内存,spark.shuffle.memoryFraction 0.2
ii)减少RDD缓存和广播变量内存 spark.storage.memoryFraction 0.6
}
5,shuffle调优{
1.buffer大小——32KB
2.shuffle read拉取数据量的大小——48M
3.shuffle聚合内存的比例——20%
4.拉取数据重试次数——5次
5.重试间隔时间60s
6.Spark Shuffle的种类
7.SortShuffle bypass机制 200次
}
6,堆外内存调优{
}
7,数据倾斜解决{
数据倾斜是什么{
有shuffle就有数据倾斜
(1).HDFS 某一台节点上数据非常多,某些节点数据量少
(2).HIVE hive中某张表有数据倾斜,这张表中某列中的一个Key对应的数据量非常多,其他的key对应的数据量非常少
(3).MR/SPARK 某个task处理的数据量非常多,其他task处理的数据量非常少
}
解决数据倾斜{
1,使用HiveETL处理数据倾斜
i.场景:当spark经常对某一张有数据倾斜的Hive表处理
解决:将数据倾斜提前到hive中,生成一张没有数据倾斜的表,spark经常处理时,针对这张hive预聚合的表处理
治标不治本
2,过滤导致数据倾斜的key对计算影响不大
i.场景:导致数据倾斜的key对计算影响不大
解决:将导致数据倾斜的key过滤掉,这些key不参与计算
3,提高shuff并行度
场景:不同的key对应的和数据量多
解决:提高shuffl并行度
4,双重聚合
场景:同一个key对应的数据量多
解决:先sample抽样出现次数多的key,随机加前缀打散,聚合,再去前缀,再聚合,最后将结果合并起来
5,将Reduce join 转为 map join
场景:俩个RDD或者俩张表要join时,可以使用广播变量+transformation类算子代替join
解决:如果俩个RDD,某个RDD量比较小,可以将这个RDD结果回收到driver端,将这个结果广播到executor端,再对另外一个RDD使用没有shuffle的transformation算子操作
注意:
在sparkSql里,有个spark.sql.autoBroadcastJoinThreshold =10mb
如果俩个表join的时候,有一张表小于该参数,会自动将小表变广播变量
6,采样倾斜key并分拆join操作
场景:俩个RDD都比较大,某个RDD中有少量倾斜的key,无法使用方法5来解决
解决:将倾斜的key的RDD拆分成俩个RDD,一个是由倾斜的key组成的rdd,另外一个是没有倾斜的key组成的RDD,对另外的RDD做同样的拆分,没有key倾斜的俩个RDD正常join,有倾斜的key的RDD随机加前缀,另外的RDD膨胀,再去join,将原来由一个task处理一个key的情况,分散到多个task处理不同的key,join完成之后,去前缀,再与正常的join的结果union
7:使用随机前缀和扩容RDD进行join
场景:俩个RDD都比较大,某个RDD中有大量倾斜的key,无法使用方法5解决
解决:
直接数据有Key的RDD随机加前缀,对另外一个RDD膨胀,join
}
}
}