spark调优【转】

【Spark集群并行度】

在Spark集群环境下,只有足够高的并行度才能使系统资源得到充分的利用,可以通过修改spark-env.sh来调整Executor的数量和使用资源,Standalone和YARN方式资源的调度管理是不同的。

在Standalone模式下:

1. 每个节点使用的最大内存数:SPARK_WORKER_INSTANCES*SPARK_WORKER_MEMORY;

2. 每个节点的最大并发task数:SPARK_WORKER_INSTANCES*SPARK_WORKER_CORES。

在YARN模式下:

1. 集群task并行度:SPARK_ EXECUTOR_INSTANCES* SPARK_EXECUTOR_CORES;

2. 集群内存总量:(executor个数) * (SPARK_EXECUTOR_MEMORY+ spark.yarn.executor.memoryOverhead)+(SPARK_DRIVER_MEMORY+spark.yarn.driver.memoryOverhead)。

重点强调:Spark对Executor和Driver额外添加堆内存大小,Executor端:由spark.yarn.executor.memoryOverhead设置,默认值executorMemory * 0.07与384的最大值。Driver端:由spark.yarn.driver.memoryOverhead设置,默认值driverMemory * 0.07与384的最大值。

通过调整上述参数,可以提高集群并行度,让系统同时执行的任务更多,那么对于相同的任务,并行度高了,可以减少轮询次数。举例说明:如果一个stage有100task,并行度为50,那么执行完这次任务,需要轮询两次才能完成,如果并行度为100,那么一次就可以了。

但是在资源相同的情况,并行度高了,相应的Executor内存就会减少,所以需要根据实际实况协调内存和core。此外,Spark能够非常有效的支持短时间任务(例如:200ms),因为会对所有的任务复用JVM,这样能减小任务启动的消耗,Standalone模式下,core可以允许1-2倍于物理core的数量进行超配。

【Spark任务数量调整】

Spark的任务数由stage中的起始的所有RDD的partition之和数量决定,所以需要了解每个RDD的partition的计算方法。以Spark应用从HDFS读取数据为例,HadoopRDD的partition切分方法完全继承于MapReduce中的FileInputFormat,具体的partition数量由HDFS的块大小、mapred.min.split.size的大小、文件的压缩方式等多个因素决定,详情需要参见FileInputFormat的代码。

【Spark内存调优】

内存优化有三个方面的考虑:对象所占用的内存,访问对象的消耗以及垃圾回收所占用的开销。

1. 对象所占内存,优化数据结构

Spark 默认使用Java序列化对象,虽然Java对象的访问速度更快,但其占用的空间通常比其内部的属性数据大2-5倍。为了减少内存的使用,减少Java序列化后的额外开销,下面列举一些Spark官网(http://spark.apache.org/docs/latest/tuning.html#tuning-data-structures)提供的方法。

(1)使用对象数组以及原始类型(primitive type)数组以替代Java或者Scala集合类(collection class)。fastutil 库为原始数据类型提供了非常方便的集合类,且兼容Java标准类库。

(2)尽可能地避免采用含有指针的嵌套数据结构来保存小对象。

(3)考虑采用数字ID或者枚举类型以便替代String类型的主键。

(4)如果内存少于32GB,设置JVM参数-XX:+UseCom-pressedOops以便将8字节指针修改成4字节。与此同时,在Java 7或者更高版本,设置JVM参数-XX:+UseC—–ompressedStrings以便采用8比特来编码每一个ASCII字符。

2. 内存回收

(1)获取内存统计信息:优化内存前需要了解集群的内存回收频率、内存回收耗费时间等信息,可以在spark-env.sh中设置SPARK_JAVA_OPTS=“-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps $ SPARK_JAVA_OPTS”来获取每一次内存回收的信息。

(2)优化缓存大小:默认情况Spark采用运行内存(spark.executor.memory)的60%来进行RDD缓存。这表明在任务执行期间,有40%的内存可以用来进行对象创建。如果任务运行速度变慢且JVM频繁进行内存回收,或者内存空间不足,那么降低缓存大小设置可以减少内存消耗,可以降低spark.storage.memoryFraction的大小。

3. 频繁GC或者OOM

针对这种情况,首先要确定现象是发生在Driver端还是在Executor端,然后在分别处理。

Driver端:通常由于计算过大的结果集被回收到Driver端导致,需要调大Driver端的内存解决,或者进一步减少结果集的数量。

Executor端:

(1)以外部数据作为输入的Stage:这类Stage中出现GC通常是因为在Map侧进行map-side-combine时,由于group过多引起的。解决方法可以增加partition的数量(即task的数量)来减少每个task要处理的数据,来减少GC的可能性。

(2)以shuffle作为输入的Stage:这类Stage中出现GC的通常原因也是和shuffle有关,常见原因是某一个或多个group的数据过多,也就是所谓的数据倾斜,最简单的办法就是增加shuffle的task数量,比如在SparkSQL中设置SET spark.sql.shuffle.partitions=400,如果调大shuffle的task无法解决问题,说明你的数据倾斜很严重,某一个group的数据远远大于其他的group,需要你在业务逻辑上进行调整,预先针对较大的group做单独处理。

【修改序列化】

使用Kryo序列化,因为Kryo序列化结果比Java标准序列化更小,更快速。具体方法:spark-default.conf 里设置spark.serializer为org.apache.spark.serializer.KryoSerializer 。

参考官方文档(http://spark.apache.org/docs/latest/tuning.html#summary):对于大多数程序而言,采用Kryo框架以及序列化能够解决性能相关的大部分问题。

【Spark 磁盘调优】

在集群环境下,如果数据分布不均匀,造成节点间任务分布不均匀,也会导致节点间源数据不必要的网络传输,从而大大影响系统性能,那么对于磁盘调优最好先将数据资源分布均匀。除此之外,还可以对源数据做一定的处理:

1. 在内存允许范围内,将频繁访问的文件或数据置于内存中;

2. 如果磁盘充裕,可以适当增加源数据在HDFS上的备份数以减少网络传输;

3. Spark支持多种文件格式及压缩方式,根据不同的应用环境进行合理的选择。如果每次计算只需要其中的某几列,可以使用列式文件格式,以减少磁盘I/O,常用的列式有parquet、rcfile。如果文件过大,将原文件压缩可以减少磁盘I/O,例如:gzip、snappy、lzo。

【其他】

广播变量(broadcast)

当task中需要访问一个Driver端较大的数据时,可以通过使用SparkContext的广播变量来减小每一个任务的大小以及在集群中启动作业的消耗。参考官方文档http://spark.apache.org/docs/latest/tuning.html#broadcasting-large-variables

开启推测机制

推测机制后,如果集群中,某一台机器的几个task特别慢,推测机制会将任务分配到其他机器执行,最后Spark会选取最快的作为最终结果。

在spark-default.conf 中添加:spark.speculation true

推测机制与以下几个参数有关:

1. spark.speculation.interval 100:检测周期,单位毫秒;

2. spark.speculation.quantile 0.75:完成task的百分比时启动推测;

并行度设置

1.Spark的并行度指的是什么?

** spark作业中,各个stage的task的数量,也就代表了spark作业在各个阶段stage的并行度!**

当分配完所能分配的最大资源了,然后对应资源去调节程序的并行度,如果并行度没有与资源相匹配,那么**导致****你分配下去的资源都浪费掉了。****同时并行运行,还可以让每个task要处理的数量变少(**很简单的原理。合理设置并行度,可以充分利用集群资源,减少每个task处理数据量,而增加性能加快运行速度。**)**

举例:

    假如, 现在已经在spark-submit 脚本里面,给我们的spark作业分配了足够多的资源,比如**50个executor **,每个**executor 有10G内存**,**每个executor有3个cpu core** 。 基本已经达到了集群或者yarn队列的资源上限。

task没有设置,或者设置的很少,比如就设置了,100个task 。 50个executor ,每个executor 有3个core ,也就是说****Application 任何一个stage运行的时候,都有总数150个cpu core ,可以并行运行。但是,你现在只有100个task ,平均分配一下,每个executor 分配到2个task,ok,那么同时在运行的task,只有100个task,每个executor 只会并行运行 2个task。 每个executor 剩下的一个cpu core 就浪费掉了!你的资源,虽然分配充足了,但是问题是, 并行度没有与资源相匹配,导致你分配下去的资源都浪费掉了。****合理的并行度的设置,应该要设置的足够大,大到可以完全合理的利用你的集群资源; 比如上面的例子,总共集群有150个cpu core ,可以并行运行150个task。那么你就应该将你的Application 的并行度,至少设置成150个,才能完全有效的利用你的集群资源,让150个task ,并行执行,而且task增加到150个以后,即可以同时并行运行,还可以让每个task要处理的数量变少; 比如总共** 150G 的数据要处理, 如果是100个task 每个task 要计算1.5G的数据。 现在增加到150个task,每个task只要处理1G数据**。

2.如何去提高并行度?

1、task数量,至少设置成与spark Application 的总cpu core 数量相同(最理性情况,150个core,分配150task,一起运行,差不多同一时间运行完毕)**官方推荐,task数量,设置成spark Application 总cpu core数量的2~3倍 ,比如150个cpu core ,基本设置 task数量为 300~ 500. **与理性情况不同的,有些task 会运行快一点,比如50s 就完了,有些task 可能会慢一点,要一分半才运行完,所以如果你的task数量,刚好设置的跟cpu core 数量相同,可能会导致资源的浪费,因为 比如150task ,10个先运行完了,剩余140个还在运行,但是这个时候,就有10个cpu core空闲出来了,导致浪费。如果设置2~3倍,那么一个task运行完以后,另外一个task马上补上来,尽量让cpu core不要空闲。同时尽量提升spark运行效率和速度。提升性能。

** 2、如何设置一个Spark Application的并行度?**

  **spark.defalut.parallelism** **默认是没有值的,如果设置了值比如说10,是在shuffle的过程才会起作用**(val rdd2 = rdd1.reduceByKey(_+_) //rdd2的分区数就是10,rdd1的分区数不受这个参数的影响)

** new SparkConf().set(“spark.defalut.parallelism”,”“500)**

** 3、如果读取的数据在HDFS上,增加block数**,默认情况下split与block是一对一的,而split又与RDD中的partition对应,所以增加了block数,也就提高了并行度。

**4、RDD.repartition**,给RDD重新设置partition的数量

**5、reduceByKey的算子指定partition的数量**

             val rdd2 = rdd1.reduceByKey(_+_,10)  val rdd3 = rdd2.map.filter.reduceByKey(_+_)

** 6、val rdd3 = rdd1.join**(rdd2) rdd3里面partiiton的数量是由父RDD中最多的partition数量来决定,因此使用join算子的时候,增加父RDD中partition的数量。


** 7、spark.sql.shuffle.partitions //spark sql中shuffle过程中partitions的数量**

3. spark.speculation.multiplier 1.5:比其他的慢多少倍时启动推测。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,099评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,828评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,540评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,848评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,971评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,132评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,193评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,934评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,376评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,687评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,846评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,537评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,175评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,887评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,134评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,674评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,741评论 2 351

推荐阅读更多精彩内容

  • 1、 性能调优 1.1、 分配更多资源 1.1.1、分配哪些资源? Executor的数量 每个Executor所...
    Frank_8942阅读 4,534评论 2 36
  • 1.1、 分配更多资源 1.1.1、分配哪些资源? Executor的数量 每个Executor所能分配的CPU数...
    miss幸运阅读 3,178评论 3 15
  • 1.分配更多的资源 -- 性能调优的王道 真实项目里的脚本: bin/spark-submit \ --c...
    evan_355e阅读 1,844评论 0 0
  • Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AM...
    大佛爱读书阅读 2,824评论 0 20
  • 补档。
    沃雷塔尔阅读 101评论 0 0