spark性能优化三:RDD操作优化

1、重构RDD架构和RDD持久化

  • RDD架构重构与优化
    尽量去复用RDD,差不多的RDD,可以抽取成一个共同的RDD,供后面的RDD计算时,反复使用;

  • 公共RDD一定要实现持久化
    如果程序中,对某一个RDD,基于它进行了多次transformation或者action操作,那么就非常有必要对其进行持久化操作,以避免对同一个RDD反复进行计算;

此外,如果要保证在RDD的持久化数据可能丢失的情况下,还要保证高性能,那么可以对RDD进行Checkpoint操作。

2、使用序列化的持久化级别

在对多次使用的RDD进行持久化操作之外,还可以进一步优化其性能,因为,通常情况下,RDD的数据是持久化到内存,或者是磁盘中的。

那么此时,如果内存不是特别充足,完全可以使用序列化的持久化级别,
比如MEMORY_ONLY_SER、MEMORY_AND_DISK_SER等,使用RDD.persist(StorageLevel.MEMORY_ONLY_SER)来设置;

这样的话,将数据序列化之后,再持久化,可以大大减少对内存的消耗,同时,数据量小了后,如果要写入磁盘,那么磁盘io性能消耗也比较小。

对RDD持久化序列化后,RDD的每个partition的数据,都是序列化为一个巨大的字节数组,这样,对于内存的消耗就小的多了,但是唯一的缺点就是,获取RDD数据时,需要对其进行反序列化,会增大其性能开销。

对于序列化的持久化级别,还可以进一步优化,可以使用Kryo序列化库,
从而获得更快的序列化速度,并且占用更小的内从空间。

3、广播共享数据

  • 背景:
    默认情况下,task执行的算子中,如果使用了外部的变量,那么每个task都会获取一份变量的副本;

这种情况下,如果使用到了特别大的数据10M(1M-100M),同时task数量是500,那么spark作业,首先会把这个10M的变量,拷贝500份,然后通过网络传输到各个task中去,给task使用。这时,总共会有5G的数据会通过网络传输,这个网络的开销可不小。这个变量到task上以后,也会占用task内存,一下子就会消耗掉5G的内存,这些不必要的内存的消耗和占用,会导致当进行RDD持久化到内存时,内存放不下,然后只能写入磁盘,同时task在执行算子创建对象的时候,会发现堆内存放不下所有对象,也许就会导致频繁的GC,GC的时候,一定是会导致工作线程停止,那么spark作业也只能暂停了,频繁GC的话,对Spark作业的运行速度回有相当大的影响。

  • 解决办法:将该数据进行广播。
    这样的话,就不至于将一个大数据拷贝到每一个task上去,而是给每个节点的executor拷贝一份,然后executor上的task共享该数据。这样的话,就可以大大减少大数据在节点上的内存消耗,并且可以减少数据到节点的网络传输消耗。

  • 广播过程:初始的时候,就在Driver上有一份副本,task在运行的时候,想要使用广播变量中的数据,此时首先会在本地的executor对应的blockManager中,尝试获取变量副本,如果本地没有,那么就从Driver远程拉取变量副本,并保存在本地的blockManager中,此后,这个executor上的task都会直接私用本地的BlockManager中的副本。
    executor的blockManager除了从driver上拉取,也可能从其他节点的blockManger上拉取变量副本,距离越近越好。

BlockManager:负责管理某个Executor上对应的内存和磁盘上的数据,详细描述过程见前面写的BlockManager原理。

4、数据本地化

数据本地化,指的是,数据离计算它的代码有多近,数据本地化对于Spark Job性能有着巨大的影响。如果数据以及要计算它的代码是在一起的,那么性能当然会非常高;如果数据和计算它的代码是分开的,那么其中之一必须到另外一方的机器上。
通常来说,移动代码到其他节点,会比移动数据到代码所在的节点速度要快的多,因为代码比较小。Spark也正是基于这个数据本地化的原则来构建task调度算法的。

基于数据距离代码的距离,有几种数据本地化级别:

  • 1、PROCESS_LOCAL:数据和计算它的代码在同一个JVM进程中;
  • 2、NODE_LOCAL:数据和计算它的代码在一个节点上,但是不在一个进程中,比如,在不同的executor进程中,或者是数据在HDFS文件的block中;
  • 3、NO_PREF:数据从哪里过来,性能都是一样的;
  • 4、RACK_LOCAL:数据和计算它的代码在一个机架上;
  • 5、ANY:数据可能在任意地方,比如其他网络环境内,或者其他机架上
image.png

Spark在Driver上,对Application的每一个stage的task,进行分配之前,都会计算出每个task要计算的是哪个分片数据,RDD的某个partiion(见task分配算法),Spark倾向于使用最好的本地化级别来调度task,即每个task正好分配到它要计算的数据所在的节点,这样的话,就不用再网络中传输数据,但是通常来说,不可能每个task都这样分配,因为可能那个节点的计算资源和计算能力都满了,这时候就没有空闲的executor来处理数据,那么Spark就会放低本地化级别,这时有两个选择,第一,等待,直到executor上的cpu释放出来,然后就分配task过去;第二,立即在任意一个executor上启动一个task

Spark默认等一会,来期望数据所在的节点上的executor空闲出一个cpu,从而将task分配过去;只要超过了设置的时间,那么Spark就会将task分配到其他任意一个空闲的executor上,这时候就会发生数据传输,task会通过其所在节点的BlockManager来获取数据,BlockManager发现本地没有数据,会通过一个gerRemote()方法,通过TransferService(网络数据传输组件)从数据所在节点的BlockManager中获取数据,通过网络传输回task所在节点。

  • 那么,我们调节什么?
    可以设置spark.locality系列参数来调节Spark等待task可以进行数据本地化的时间,spark.locality.wait、spark.locality.wait.node、spark.locality.wait.process、spark.locality.wait.rack

  • 什么情况下调节
    观察spark作业的运行日志,推荐大家在测试的时候,先用client模式,在本地就直接可以看到比较全的日志。
    日志里面会显示,starting task。。。,PROCESS LOCAL、NODE LOCAL,观察大部分task的数据本地化级别;

    如果大多都是PROCESS_LOCAL,那就不用调节了
    如果是发现,好多的级别都是NODE_LOCAL、ANY,那么最好就去调节一下数据本地化的等待时长,调节完,应该是要反复调节,每次调节完以后,再来运行,观察日志;看看大部分的task的本地化级别有没有提升;看看,整个spark作业的运行时间有没有缩短

    注意,别本末倒置,本地化级别倒是提升了,但是因为大量的等待时长,spark作业的运行时间反而增加了,那就还是不要调节了

  • 设置方式
    new SparkConf().set("spark.locality.wait", "10")

5、reduceByKey 和 groupByKey

val counts = pairs.reduceByKey(+)

val counts = pairs.groupByKey().map(wordCounts = > (wordCounts._1,wordCounts._2.sum))

如果能用reduceByKey,那就用reduceByKey,因为它会在map端,先进行本地combine,可以大大减少要传输到reduce端的数据量,减少网络传输的开销。

只有在reduceByKey处理不来时,采用groupByKey().map()来替代。

image.png
image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,794评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,050评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,587评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,861评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,901评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,898评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,832评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,617评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,077评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,349评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,483评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,199评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,824评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,442评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,632评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,474评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,393评论 2 352

推荐阅读更多精彩内容

  • 1.1、 分配更多资源 1.1.1、分配哪些资源? Executor的数量 每个Executor所能分配的CPU数...
    miss幸运阅读 3,179评论 3 15
  • 1、 性能调优 1.1、 分配更多资源 1.1.1、分配哪些资源? Executor的数量 每个Executor所...
    Frank_8942阅读 4,539评论 2 36
  • 1.分配更多的资源 -- 性能调优的王道 真实项目里的脚本: bin/spark-submit \ --c...
    evan_355e阅读 1,846评论 0 0
  • 前言 在大数据计算领域,Spark已经成为了越来越流行、越来越受欢迎的计算平台之一。Spark的功能涵盖了大数据领...
    Alukar阅读 553评论 0 6
  • 赶了一天一夜的火车,决定先洗澡再出门,我房间吹风机坏了,带着湿漉漉的头发跑到你的房间,你接过我粉红色的浴巾,帮我擦...
    岚梦竹阅读 346评论 0 3