1.数据倾斜产生原因?
原因:由于数据本身原因或者是使用filter算子过滤后导致分区内数据量相差太大,有的分区多,有的分区少
解决方案:可以使用saprkWeb端口4040查看具体到哪个算子哪个分区shuffle文件大
1.1增加或者减少并行度
在这种情况下,可以通过调整Shuffle并行度,使得原来被分配到同一个Task的不同Key分配到不同Task,从而降低倾斜分区所需理的数据量,通过groupbykey(17)将Shnffle并行度调整为17,重新提交到Spark,或者将并行度减小也行。
1.2自定义分区
CustomerPartitioner(numParts:Int) extends org.apache.spark.Partitioner
//覆盖分区数
override def numPartitions: Int = numParts
//覆盖分区号获取函数。
override def getPartition(key: Any): Int = {
val id: Int = key.toString.toInt
if (id <= 900000) {
return new java.util.Random().nextInt(100) % 12
}
else {
return id % 12
}
}
rdd.groupByKey(new CustomerPartitioner(12)).count
2.spark算子调优
2.1mapPartition
但是如果使用mapPartitions算子,但数据量非常大时function一次处理一个分区的数据,如果一旦内存不足,此时无法回收内存,就可能会0OM,即内存溢出,因此,mapPartitions算子适用于数据量不是特别大的时候
2.2foreachPartition
在生产环境中,通常使用foreachPartition算子来完成数据库的写入,通过foreachPartition算子的特性,可以优化写数据库的性能
2.3filter与coalesce的配合使用
在Spark任务中我们经常会使用filter算子完成RDD中数据的过滤,在任务初始阶段,从各个分区中加载到的数据量是相近的,但是一旦进过filter过滤,每个分区的数据量有可能会存在较大差异
2.4repartition解决SparkSQL低并行度问题。
并行度的设置对于SparkSQL是不生效的,用户设置的并行度只对于SparkSQL以外的所有Spark的stage生效,SparkSQL的并行度不允许用户自己指定,SparkSQL自己会默认根据hive表对应的HDFS文件的split个数自动设置SparkSQL所在的那个stage的并行度,用户自己通spark.default.parallelism参数指定的并行度,只会在没SparkSQL的stage中生效
2.5reduceByKey替代groupByKey
3.saprk性能调优
3.1分配更多资源
spark-submit \
--class org.apache.spark.examples SparkPi \
--master yarn \ 实际生产环境一定使用yarn-cluster
--deploy-mode cluster \
--num-executors 50~100 \
--driver-memory 1G~6G \
--executor-memory 6G~10G \
--executor-cores 3 \ 一个核处理2~3个task
/root/apps/spark-2.2.0-bin-hadoop2.7/examples/jars/spark-examples.jar \
100
3.2rdd优化
逻辑一样的RDD重用,对于多次使用的RDD需要持久化,减少filter操作
3.3 并行度调节
task核数的2~3倍
3.4广播大变量
广播变量在每个Executor保存一个副本,此Executor的所有task共用此广播变量,这让变量产生的副本数量大大减少在初始阶段,广播变量只在Driver中有一份副本.task在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的Executor对应的BlockManager中尝试获取变量,如果本地没有,BlockManager就会从Driver或者其他节点的BlockManager上远程拉取变量的复本,并由本地的BlockManager进行管理;之后此Executor的所有task都会直接从本地的BlockManager中获取变量