1 内存有限的情况下 Spark 如何处理 T 级别的数据?
Spark被称为“内存计算引擎”是因为它可以做内存计算,而不是它只能做内存计算。Spark确实擅长内存计算,内存容量不足时也可以回退。
实际上,内存cache对于Spark来说仅仅只是一个优化,即便完全关闭,效率仍然比MapReduce要来得高。
对于Spark这类内存计算系统,并不是说要处理多大规模的数据就需要多大规模的内存。Spark相对Hadoop MR有大幅性能提升的一个前提就是大量大数据作业同一时刻需要加载进内存的数据只是整体数据的一个子集,且大部分情况下可以完全放入内存。
至于数据子集仍然无法放入集群物理内存的情况,Spark仍然可以妥善处理
在Spark内部,单个executor进程内RDD的分片数据是用Iterator流式访问的,Iterator的hasNext方法和next方法是由RDD lineage上各个transformation携带的闭包函数复合而成的。该复合Iterator每访问一个元素,就对该元素应用相应的复合函数,得到的结果再流式地落地。
如果用户没有要求Spark cache该RDD的结果,那么这个过程占用的内存是很小的,一个元素处理完毕后就落地或扔掉了(概念上如此,实现上有buffer),并不会长久地占用内存。只有在用户要求Spark cache该RDD,且storage level要求在内存中cache时,Iterator计算出的结果才会被保留,通过cache manager放入内存池。
2 Spark内存溢出的问题
简介
Spark中的OOM问题不外乎以下两种情况
map执行中内存溢出
shuffle后内存溢出
map执行中内存溢出代表了所有map类型的操作。包括:flatMap,filter,mapPartitions等。
shuffle后内存溢出的shuffle操作包括join,reduceByKey,repartition等操作。
内存模型
Spark在一个Executor中的内存分为三块,一块是execution内存,一块是storage内存,一块是other内存。other占用相对小很多。
execution内存是执行内存,文档中说join,aggregate都在这部分内存中执行,shuffle的数据也会先缓存在这个内存中,满了再写入磁盘,能够减少IO。其实map过程也是在这个内存中执行的。
storage内存是存储broadcast,cache,persist数据的地方。
other内存是程序执行时预留给自己的内存。
内存溢出解决方法
- map过程产生大量对象导致内存溢出
单个map中产生了大量的对象导致的。
例如:rdd.map(x=>for(i <- 1 to 10000) yield i.toString),
这个操作在rdd中,每个对象都产生了10000个对象,这肯定很容易产生内存溢出的问题。
针对这种问题,在不增加内存的情况下,可以通过减少每个Task的大小,以便达到每个Task即使产生大量的对象Executor的内存也能够装得下。具体做法可以在会产生大量对象的map操作之前调用repartition方法,分区成更小的块传入map。
例如:rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)。
- 数据不平衡导致内存溢出
数据不平衡除了有可能导致内存溢出外,也有可能导致性能的问题,解决方法和上面说的类似,就是调用repartition重新分区。
- coalesce调用导致内存溢出
因为hdfs中不适合存小问题,所以Spark计算后如果产生的文件太小,我们会调用coalesce合并文件再存入hdfs中。
但是这会导致一个问题,例如在coalesce之前有100个文件,这也意味着能够有100个Task,现在调用coalesce(10),最后只产生10个文件。
因为coalesce并不是shuffle操作,这意味着coalesce并不是按照我原本想的那样先执行100个Task,再将Task的执行结果合并成10个,而是从头到位只有10个Task在执行,原本100个文件是分开执行的,现在每个Task同时一次读取10个文件,使用的内存是原来的10倍,这导致了OOM。
解决这个问题的方法是令程序按照我们想的先执行100个Task再将结果合并成10个文件,这个问题同样可以通过repartition解决,调用repartition(10),因为这就有一个shuffle的过程,shuffle前后是两个Stage,一个100个分区,一个是10个分区,就能按照我们的想法执行。
- shuffle后内存溢出
shuffle内存溢出的情况可以说都是shuffle后,单个文件过大导致的。
在Spark中,join,reduceByKey这一类型的过程,都会有shuffle的过程,在shuffle的使用,需要传入一个partitioner,大部分Spark中的shuffle操作,默认的partitioner都是HashPatitioner
- standalone模式下资源分配不均匀导致内存溢出
在standalone的模式下如果配置了–total-executor-cores 和 –executor-memory 这两个参数,但是没有配置–executor-cores这个参数的话,就有可能导致,每个Executor的memory是一样的,但是cores的数量不同,那么在cores数量多的Executor中,由于能够同时执行多个Task,就容易导致内存溢出的情况。
- 在RDD中,共用对象能够减少OOM的情况
类似这样rdd.flatMap(x=>for(i <- 1 to 1000) yield (“key”,”value”))导致OOM
使用rdd.flatMap(x=>for(i <- 1 to 1000) yield “key”+”value”)就不会有OOM的问题。
因为每次(“key”,”value”)都产生一个Tuple对象,而”key”+”value”,不管多少个,都只有一个对象,指向常量池。
3 优化方式
使用mapPartitions代替大部分map操作,或者连续使用的map操作
RDD强调的是不可变对象,每个RDD都是不可变的,当调用RDD的map类型操作的时候,都是产生一个新的对象,这就导致了一个问题,如果对一个RDD调用大量的map类型操作的话,每个map操作会产生一个到多个RDD对象。
在长操作中,可以在mapPartitons中将RDD大量的操作写在一起,避免产生大量的中间rdd对象
另外是mapPartitions在一个partition中可以复用可变类型,这也能够避免频繁的创建新对象。使用mapPartitions的弊端就是牺牲了代码的易读性。
broadcast join和普通join
在大数据分布式系统中,大量数据的移动对性能的影响也是巨大的。
基于这个思想,在两个RDD进行join操作的时候,如果其中一个RDD相对小很多,可以将小的RDD进行collect操作然后设置为broadcast变量,这样做之后,另一个RDD就可以使用map操作进行join,这样能够有效的减少相对大很多的那个RDD的数据移动。
3 shuffle 和 stage
什么是shuffle
shuffle 是划分 DAG 中 stage 的标识,同时影响 Spark 执行速度的关键步骤.
RDD 的 Transformation 函数中,又分为窄依赖(narrow dependency)和宽依赖(wide dependency)的操作.窄依赖跟宽依赖的区别是是否发生 shuffle(洗牌) 操作.
宽依赖会发生 shuffle 操作.
窄依赖是子 RDD的各个分片(partition)不依赖于其他分片,能够独立计算得到结果,宽依赖指子 RDD 的各个分片会依赖于父RDD 的多个分片,所以会造成父 RDD 的各个分片在集群中重新分片。
// Map: "cat" -> c, cat
val rdd1 = rdd.Map(x => (x.charAt(0), x))
// groupby same key and count
val rdd2 = rdd1.groupBy(x => x._1).
Map(x => (x._1, x._2.toList.length))
第一个 Map 操作将 RDD 里的各个元素进行映射, RDD 的各个数据元素之间不存在依赖,可以在集群的各个内存中独立计算,也就是并行化.
第二个 groupby 之后的 Map 操作,为了计算相同 key 下的元素个数,需要把相同 key 的元素聚集到同一个 partition 下,所以造成了数据在内存中的重新分布,即 shuffle 操作.
shuffle 操作是 spark 中最耗时的操作,应尽量避免不必要的 shuffle.
spark的shuffle过程
Spark是根据shuffle类算子来进行stage的划分。如果我们的代码中执行了某个shuffle类算子(比如reduceByKey、join等),那么就会在该算子处,划分出一个stage界限来。可以大致理解为,shuffle算子执行之前的代码会被划分为一个stage,shuffle算子执行以及之后的代码会被划分为下一个stage。因此一个stage刚开始执行的时候,它的每个Task可能都会从上一个stage的Task所在的节点,去通过网络传输拉取需要自己处理的所有key,然后对拉取到的所有相同的key使用我们自己编写的算子函数执行聚合操作(比如reduceByKey()算子接收的函数)。这个过程就是shuffle。
4 Spark运行原理
我们使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程。根据你使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动。
Driver进程要做的第一件事情,就是向集群管理器(可以是Spark Standalone集群,也可以是其他的资源管理集群,美团•大众点评使用的是YARN作为资源管理集群)申请运行Spark作业需要使用的资源,这里的资源指的就是Executor进程。YARN集群管理器会根据我们为Spark作业设置的资源参数,在各个工作节点上,启动一定数量的Executor进程,每个Executor进程都占有一定数量的内存和CPU core。
在申请到了作业执行所需的资源之后,Driver进程就会开始调度和执行我们编写的作业代码了。
Driver进程会将我们编写的Spark作业代码分拆为多个stage,每个stage执行一部分代码片段,并为每个stage创建一批Task,然后将这些Task分配到各个Executor进程中执行。
Task是最小的计算单元,负责执行一模一样的计算逻辑(也就是我们自己编写的某个代码片段),只是每个Task处理的数据不同而已。
一个stage的所有Task都执行完毕之后,会在各个节点本地的磁盘文件中写入计算中间结果,然后Driver就会调度运行下一个stage。下一个stage的Task的输入数据就是上一个stage输出的中间结果。如此循环往复,直到将我们自己编写的代码逻辑全部执行完,并且计算完所有的数据,得到我们想要的结果为止。
Spark是根据shuffle类算子来进行stage的划分。
如果我们的代码中执行了某个shuffle类算子(比如reduceByKey、join等),那么就会在该算子处,划分出一个stage界限来。
可以大致理解为,shuffle算子执行之前的代码会被划分为一个stage,shuffle算子执行以及之后的代码会被划分为下一个stage。因此一个stage刚开始执行的时候,它的每个Task可能都会从上一个stage的Task所在的节点,去通过网络传输拉取需要自己处理的所有key,然后对拉取到的所有相同的key使用我们自己编写的算子函数执行聚合操作(比如reduceByKey()算子接收的函数)。这个过程就是shuffle。
当我们在代码中执行了cache/persist等持久化操作时,根据我们选择的持久化级别的不同,每个Task计算出来的数据也会保存到Executor进程的内存或者所在节点的磁盘文件中。
因此Executor的内存主要分为三块:第一块是让Task执行我们自己编写的代码时使用,默认是占Executor总内存的20%;第二块是让Task通过shuffle过程拉取了上一个stage的Task的输出后,进行聚合等操作时使用,默认也是占Executor总内存的20%;第三块是让RDD持久化时使用,默认占Executor总内存的60%。
在实际编程中,我们不需关心以上调度细节.只需使用 Spark 提供的指定语言的编程接口调用相应的 API 即可.
在 Spark API 中, 一个 应用(Application) 对应一个 SparkContext 的实例。一个 应用 可以用于单个 Job,或者分开的多个 Job 的 session,或者响应请求的长时间生存的服务器。与 MapReduce 不同的是,一个 应用 的进程(我们称之为 Executor),会一直在集群上运行,即使当时没有 Job 在上面运行。
而调用一个Spark内部的 Action 会产生一个 Spark job 来完成它。 为了确定这些job实际的内容,Spark 检查 RDD 的DAG再计算出执行 plan 。这个 plan 以最远端的 RDD 为起点(最远端指的是对外没有依赖的 RDD 或者 数据已经缓存下来的 RDD),产生结果 RDD 的 Action 为结束 。并根据是否发生 shuffle 划分 DAG 的 stage.
有向无环图(DAG):Directed Acycle graph,反应RDD之间的依赖关系
5 Spark MLlib
MLlib是Spark的机器学习(Machine Learning)库,旨在简化机器学习的工程实践工作,并方便扩展到更大规模。MLlib由一些通用的学习算法和工具组成,包括分类、回归、聚类、协同过滤、降维等,同时还包括底层的优化原语和高层的管道API。具体来说,其主要包括以下几方面的内容:
算法工具:常用的学习算法,如分类、回归、聚类和协同过滤;
特征化工具:特征提取、转化、降维,和选择工具;
管道(Pipeline):用于构建、评估和调整机器学习管道的工具;
持久性:保存和加载算法,模型和管道;
实用工具:线性代数,统计,数据处理等工具。
Spark 机器学习库从 1.2 版本以后被分为两个包:
spark.mllib
包含基于RDD的原始算法API。Spark MLlib 历史比较长,在1.0 以前的版本即已经包含了,提供的算法实现都是基于原始的 RDD。spark.ml
则提供了基于DataFrames 高层次的API,可以用来构建机器学习工作流(PipeLine)。ML Pipeline 弥补了原始 MLlib 库的不足,向用户提供了一个基于 DataFrame 的机器学习工作流式 API 套件。
使用 ML Pipeline API可以很方便的把数据处理,特征转换,正则化,以及多个机器学习算法联合起来,构建一个单一完整的机器学习流水线。这种方式给我们提供了更灵活的方法,更符合机器学习过程的特点,也更容易从其他语言迁移。Spark官方推荐使用spark.ml。如果新的算法能够适用于机器学习管道的概念,就应该将其放到spark.ml包中,如:特征提取器和转换器。
需要注意的是,从Spark2.0开始,基于RDD的API进入维护模式(即不增加任何新的特性),并预期于3.0版本的时候被移除出MLLib。