Spark调优

1.数据倾斜产生原因?

  • 原因:由于数据本身原因或者是使用filter算子过滤后导致分区内数据量相差太大,有的分区多,有的分区少

  • 解决方案:可以使用saprkWeb端口4040查看具体到哪个算子哪个分区shuffle文件大

1.1增加或者减少并行度

在这种情况下,可以通过调整Shuffle并行度,使得原来被分配到同一个Task的不同Key分配到不同Task,从而降低倾斜分区所需理的数据量,通过groupbykey(17)将Shnffle并行度调整为17,重新提交到Spark,或者将并行度减小也行。

1.2自定义分区

CustomerPartitioner(numParts:Int) extends org.apache.spark.Partitioner
        //覆盖分区数
        override def numPartitions: Int = numParts
        //覆盖分区号获取函数。
        override def getPartition(key: Any): Int = {
          val id: Int = key.toString.toInt
          if (id <= 900000) {
            return new java.util.Random().nextInt(100) % 12
          }
          else {
            return id % 12
          }
        }
        rdd.groupByKey(new CustomerPartitioner(12)).count

2.spark算子调优

2.1mapPartition

但是如果使用mapPartitions算子,但数据量非常大时function一次处理一个分区的数据,如果一旦内存不足,此时无法回收内存,就可能会0OM,即内存溢出,因此,mapPartitions算子适用于数据量不是特别大的时候

2.2foreachPartition

在生产环境中,通常使用foreachPartition算子来完成数据库的写入,通过foreachPartition算子的特性,可以优化写数据库的性能

2.3filter与coalesce的配合使用

在Spark任务中我们经常会使用filter算子完成RDD中数据的过滤,在任务初始阶段,从各个分区中加载到的数据量是相近的,但是一旦进过filter过滤,每个分区的数据量有可能会存在较大差异

2.4repartition解决SparkSQL低并行度问题。

并行度的设置对于SparkSQL是不生效的,用户设置的并行度只对于SparkSQL以外的所有Spark的stage生效,SparkSQL的并行度不允许用户自己指定,SparkSQL自己会默认根据hive表对应的HDFS文件的split个数自动设置SparkSQL所在的那个stage的并行度,用户自己通spark.default.parallelism参数指定的并行度,只会在没SparkSQL的stage中生效

2.5reduceByKey替代groupByKey

3.saprk性能调优

3.1分配更多资源

        spark-submit \
        --class org.apache.spark.examples SparkPi \
        --master yarn \            实际生产环境一定使用yarn-cluster
        --deploy-mode cluster \
        --num-executors 50~100 \
        --driver-memory 1G~6G \
        --executor-memory 6G~10G \
        --executor-cores 3 \       一个核处理2~3个task
        /root/apps/spark-2.2.0-bin-hadoop2.7/examples/jars/spark-examples.jar \
        100

3.2rdd优化

逻辑一样的RDD重用,对于多次使用的RDD需要持久化,减少filter操作

3.3 并行度调节

task核数的2~3倍

3.4广播大变量

广播变量在每个Executor保存一个副本,此Executor的所有task共用此广播变量,这让变量产生的副本数量大大减少在初始阶段,广播变量只在Driver中有一份副本.task在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的Executor对应的BlockManager中尝试获取变量,如果本地没有,BlockManager就会从Driver或者其他节点的BlockManager上远程拉取变量的复本,并由本地的BlockManager进行管理;之后此Executor的所有task都会直接从本地的BlockManager中获取变量

3.5调节本地化时长

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。