spark调优

spark调优{

    1,资源调优{

  a)搭建集群{

(1)配置目录在spark安装包中的conf下spark-env.sh

(2)spark_worker_cores

(3)spark_worker_memory

(4)spark_worker_instances  每台机器启动的worker数

  }


  b)提交任务{

  --executor-cores

  --executor-memory

  --driver-cores

  --driver-memory

  --total-executor-cores


注意:

一个cores一般运行2-3个task,每个task一般可以处理1G数据

  }

}

2,并行度调优{

i) sc.texFile(xxx,minumpartition)  本地读取一个文件,不写分区,默认是一个分区,按照32M来分partition,大于32M,则切分多个partition

ii) sc.parallelize(xxx,numpartition)

iii) sc.parallelizePairs(xxx,numpartitions)

iiii) sc.makeRDD(xxx,numpartitions)

iiiii) spark.default.parallelism  默认并行度

注意:

(1)集群中一个partition最多128M,分区数跟block的个数一样

(2)以上都用于测试常用

--以上不写第二个参数,默认的分区数需要看下

1,join(xxx,numpartition)与父RDD分区数多的一致,reduceByKey(xx,numpartition)不跟分区数与父RDD一致,groupByKey(xx,numpartition)

2,repartition(numpartition)/coalesce(numpartition,true)

3,自定义分区器,String取hash可能会有负数

4,读取kafka中的数据

i)Receiver spark.streaming.blockInterval=200

ii)direct 提高读取的topic的分区的个数

5,修改sparkSQL的spark.sql.shuffle.partitions--200的配置

}

3,代码调优{

(1)尽量避免重复创建RDD,尽量复用RDD

(2)对多次使用的RDD持久化

i.cache

ii.persist

iii.checkpoint

(3)尽量避免使用shuffle类算子,join=filter/MAP/FLATMAP+广播变量,当俩个RDD进行join时,可以将小的RDD回收到driver端,使用广播变量

(4)尽量使用map有预聚合的算子,reduceByKey ,groupByKey,combineByKey,aggregateByKey

好处:

i)减少shuffle write数据量

ii)减少shuffle read数据量

iii)减少reduce端聚合次数

(5)使用高性能的算子

i)使用reduceByKey 代替groupByKey

ii)使用mapPartition替代map

iii)使用foreachPartition替代foreach

iiii)使用filter后使用coalesce减少分区

iiiii)使用repartitionAndSortWithinPartitions替代repartition与sort类操作

(6)如果executor使用到了driver端的变量,可以使用广播变量

(7) 序列化性能高(kryo)conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").registerKryoClasses(new Class[]{SpeedSortKey.class})

(8)优化数据结构尽量使用字符串替代对象,使用原始类型(比如Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用,从而降低GC频率,提升性能。

(9)使用fastutil库

}

4,数据本地化{

process_local 

task处理的数据就在本进程所在的executor的内存中

node_local

task处理的数据就在本进程所在的节点磁盘中,或者处理的数据在本进程所在节点的其他executor内存中

no_perf

读取的数据在关系型数据库中

rack_local

task处理数据在同机架的其他节点磁盘上或者同机架其他节点的executor内存中

any

出机架

调优{

driver端发送task时,首先按照数据本地化,process_local发送task,默认每个task等待3s,重试,重试5次之后,如果还没有被执行,driver端会降级发送到node_local,同理,每个task等待3s重试5次之后,再将级别

•spark.locality.wait  3s  以下所有参数的默认值

• spark.locality.wait.process 3s

• spark.locality.wait.node 3s

• spark.locality.wait.rack 3s

调节task执行不能本末倒置,不能将所有的task都以process_local运行

}

}

5.内存调优{

原因:executor内存不足,导致GC,频繁的GC运行慢

1.调节executor中task的占用内存

i)减少shuffle聚合内存,spark.shuffle.memoryFraction 0.2

ii)减少RDD缓存和广播变量内存 spark.storage.memoryFraction 0.6

}

5,shuffle调优{

1.buffer大小——32KB

2.shuffle read拉取数据量的大小——48M

3.shuffle聚合内存的比例——20%

4.拉取数据重试次数——5次

5.重试间隔时间60s

6.Spark Shuffle的种类

7.SortShuffle bypass机制 200次

}

6,堆外内存调优{

}

7,数据倾斜解决{

数据倾斜是什么{

  有shuffle就有数据倾斜

(1).HDFS  某一台节点上数据非常多,某些节点数据量少

(2).HIVE hive中某张表有数据倾斜,这张表中某列中的一个Key对应的数据量非常多,其他的key对应的数据量非常少

(3).MR/SPARK 某个task处理的数据量非常多,其他task处理的数据量非常少

}

解决数据倾斜{

1,使用HiveETL处理数据倾斜

i.场景:当spark经常对某一张有数据倾斜的Hive表处理

解决:将数据倾斜提前到hive中,生成一张没有数据倾斜的表,spark经常处理时,针对这张hive预聚合的表处理

治标不治本

2,过滤导致数据倾斜的key对计算影响不大

i.场景:导致数据倾斜的key对计算影响不大

解决:将导致数据倾斜的key过滤掉,这些key不参与计算

3,提高shuff并行度

场景:不同的key对应的和数据量多

解决:提高shuffl并行度

4,双重聚合

场景:同一个key对应的数据量多

解决:先sample抽样出现次数多的key,随机加前缀打散,聚合,再去前缀,再聚合,最后将结果合并起来

5,将Reduce join 转为 map join

场景:俩个RDD或者俩张表要join时,可以使用广播变量+transformation类算子代替join

解决:如果俩个RDD,某个RDD量比较小,可以将这个RDD结果回收到driver端,将这个结果广播到executor端,再对另外一个RDD使用没有shuffle的transformation算子操作

注意:

在sparkSql里,有个spark.sql.autoBroadcastJoinThreshold =10mb

如果俩个表join的时候,有一张表小于该参数,会自动将小表变广播变量

6,采样倾斜key并分拆join操作

场景:俩个RDD都比较大,某个RDD中有少量倾斜的key,无法使用方法5来解决

解决:将倾斜的key的RDD拆分成俩个RDD,一个是由倾斜的key组成的rdd,另外一个是没有倾斜的key组成的RDD,对另外的RDD做同样的拆分,没有key倾斜的俩个RDD正常join,有倾斜的key的RDD随机加前缀,另外的RDD膨胀,再去join,将原来由一个task处理一个key的情况,分散到多个task处理不同的key,join完成之后,去前缀,再与正常的join的结果union

7:使用随机前缀和扩容RDD进行join

场景:俩个RDD都比较大,某个RDD中有大量倾斜的key,无法使用方法5解决

解决:

直接数据有Key的RDD随机加前缀,对另外一个RDD膨胀,join

}

}

}

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容