1、重构RDD架构和RDD持久化
RDD架构重构与优化
尽量去复用RDD,差不多的RDD,可以抽取成一个共同的RDD,供后面的RDD计算时,反复使用;公共RDD一定要实现持久化
如果程序中,对某一个RDD,基于它进行了多次transformation或者action操作,那么就非常有必要对其进行持久化操作,以避免对同一个RDD反复进行计算;
此外,如果要保证在RDD的持久化数据可能丢失的情况下,还要保证高性能,那么可以对RDD进行Checkpoint操作。
2、使用序列化的持久化级别
在对多次使用的RDD进行持久化操作之外,还可以进一步优化其性能,因为,通常情况下,RDD的数据是持久化到内存,或者是磁盘中的。
那么此时,如果内存不是特别充足,完全可以使用序列化的持久化级别,
比如MEMORY_ONLY_SER、MEMORY_AND_DISK_SER等,使用RDD.persist(StorageLevel.MEMORY_ONLY_SER)来设置;
这样的话,将数据序列化之后,再持久化,可以大大减少对内存的消耗,同时,数据量小了后,如果要写入磁盘,那么磁盘io性能消耗也比较小。
对RDD持久化序列化后,RDD的每个partition的数据,都是序列化为一个巨大的字节数组,这样,对于内存的消耗就小的多了,但是唯一的缺点就是,获取RDD数据时,需要对其进行反序列化,会增大其性能开销。
对于序列化的持久化级别,还可以进一步优化,可以使用Kryo序列化库,
从而获得更快的序列化速度,并且占用更小的内从空间。
3、广播共享数据
- 背景:
默认情况下,task执行的算子中,如果使用了外部的变量,那么每个task都会获取一份变量的副本;
这种情况下,如果使用到了特别大的数据10M(1M-100M),同时task数量是500,那么spark作业,首先会把这个10M的变量,拷贝500份,然后通过网络传输到各个task中去,给task使用。这时,总共会有5G的数据会通过网络传输,这个网络的开销可不小。这个变量到task上以后,也会占用task内存,一下子就会消耗掉5G的内存,这些不必要的内存的消耗和占用,会导致当进行RDD持久化到内存时,内存放不下,然后只能写入磁盘,同时task在执行算子创建对象的时候,会发现堆内存放不下所有对象,也许就会导致频繁的GC,GC的时候,一定是会导致工作线程停止,那么spark作业也只能暂停了,频繁GC的话,对Spark作业的运行速度回有相当大的影响。
解决办法:将该数据进行广播。
这样的话,就不至于将一个大数据拷贝到每一个task上去,而是给每个节点的executor拷贝一份,然后executor上的task共享该数据。这样的话,就可以大大减少大数据在节点上的内存消耗,并且可以减少数据到节点的网络传输消耗。广播过程:初始的时候,就在Driver上有一份副本,task在运行的时候,想要使用广播变量中的数据,此时首先会在本地的executor对应的blockManager中,尝试获取变量副本,如果本地没有,那么就从Driver远程拉取变量副本,并保存在本地的blockManager中,此后,这个executor上的task都会直接私用本地的BlockManager中的副本。
executor的blockManager除了从driver上拉取,也可能从其他节点的blockManger上拉取变量副本,距离越近越好。
BlockManager:负责管理某个Executor上对应的内存和磁盘上的数据,详细描述过程见前面写的BlockManager原理。
4、数据本地化
数据本地化,指的是,数据离计算它的代码有多近,数据本地化对于Spark Job性能有着巨大的影响。如果数据以及要计算它的代码是在一起的,那么性能当然会非常高;如果数据和计算它的代码是分开的,那么其中之一必须到另外一方的机器上。
通常来说,移动代码到其他节点,会比移动数据到代码所在的节点速度要快的多,因为代码比较小。Spark也正是基于这个数据本地化的原则来构建task调度算法的。
基于数据距离代码的距离,有几种数据本地化级别:
- 1、PROCESS_LOCAL:数据和计算它的代码在同一个JVM进程中;
- 2、NODE_LOCAL:数据和计算它的代码在一个节点上,但是不在一个进程中,比如,在不同的executor进程中,或者是数据在HDFS文件的block中;
- 3、NO_PREF:数据从哪里过来,性能都是一样的;
- 4、RACK_LOCAL:数据和计算它的代码在一个机架上;
- 5、ANY:数据可能在任意地方,比如其他网络环境内,或者其他机架上
Spark在Driver上,对Application的每一个stage的task,进行分配之前,都会计算出每个task要计算的是哪个分片数据,RDD的某个partiion(见task分配算法),Spark倾向于使用最好的本地化级别来调度task,即每个task正好分配到它要计算的数据所在的节点,这样的话,就不用再网络中传输数据,但是通常来说,不可能每个task都这样分配,因为可能那个节点的计算资源和计算能力都满了,这时候就没有空闲的executor来处理数据,那么Spark就会放低本地化级别,这时有两个选择,第一,等待,直到executor上的cpu释放出来,然后就分配task过去;第二,立即在任意一个executor上启动一个task
Spark默认等一会,来期望数据所在的节点上的executor空闲出一个cpu,从而将task分配过去;只要超过了设置的时间,那么Spark就会将task分配到其他任意一个空闲的executor上,这时候就会发生数据传输,task会通过其所在节点的BlockManager来获取数据,BlockManager发现本地没有数据,会通过一个gerRemote()方法,通过TransferService(网络数据传输组件)从数据所在节点的BlockManager中获取数据,通过网络传输回task所在节点。
那么,我们调节什么?
可以设置spark.locality系列参数来调节Spark等待task可以进行数据本地化的时间,spark.locality.wait、spark.locality.wait.node、spark.locality.wait.process、spark.locality.wait.rack-
什么情况下调节
观察spark作业的运行日志,推荐大家在测试的时候,先用client模式,在本地就直接可以看到比较全的日志。
日志里面会显示,starting task。。。,PROCESS LOCAL、NODE LOCAL,观察大部分task的数据本地化级别;如果大多都是PROCESS_LOCAL,那就不用调节了
如果是发现,好多的级别都是NODE_LOCAL、ANY,那么最好就去调节一下数据本地化的等待时长,调节完,应该是要反复调节,每次调节完以后,再来运行,观察日志;看看大部分的task的本地化级别有没有提升;看看,整个spark作业的运行时间有没有缩短注意,别本末倒置,本地化级别倒是提升了,但是因为大量的等待时长,spark作业的运行时间反而增加了,那就还是不要调节了
设置方式
new SparkConf().set("spark.locality.wait", "10")
5、reduceByKey 和 groupByKey
val counts = pairs.reduceByKey(+)
val counts = pairs.groupByKey().map(wordCounts = > (wordCounts._1,wordCounts._2.sum))
如果能用reduceByKey,那就用reduceByKey,因为它会在map端,先进行本地combine,可以大大减少要传输到reduce端的数据量,减少网络传输的开销。
只有在reduceByKey处理不来时,采用groupByKey().map()来替代。