读书计划: spark源码分析和学习
参考书记:《spark大数据处理技术》《 深入理解spark:核心思想和源码分析》
Matei Zaharia写的《An Architecture for Fast and General Data Processing on Large Clusters》
学习周期:1个月(2017.4.24—2017.5.24)
第一步:阅读Matei Zaharia的论文,回顾了解spark的设计。
第二部:依据以上两本书分一下8部分来详细了解spark的使用 设计 实现
- RDD
- Spark运行模式及其原理
- spark调度管理原理
- spark存储管理
- spark sql
- spark streaming
- graphx
论文
RDD概述
RDD 设计时的最大挑战在于定义一个能提供高效容错能力的编程接口。现有的基于集群的内存存储抽象,比如分布式共享内存[79],键-值存储[81],数据库,以及 Piccolo[86],提供了一个对内部状态基于细粒度更新的接口(例如,表格里面的单元).在这样的设计之下,提供容错性的方法就要么是在主机之间复制数据,要么对各主机的更新情况做日志记录。这两种方法对于数据密集型的任务来说代价很高,因为它们需要在带宽远低于内存的集群网络间拷贝大量的数据,同时还将产生大量的存储开销。
与上述系统不同的是,RDD提供一种基于粗粒度变换(如, map, filter, join)的接口,该接口会将相同的操作应用到多个数据集上。这使得他们可以通过记录用来创建数据集的变换(lineage),而不需存储真正的数据,进而达到高效的容错性。1 当一个RDD的某个分区丢失的时候,RDD记录有足够的信息记录其如何通过其他的RDD进行计算,且只需重新计算该分区。因此,丢失的数据可以被很快的恢复,而不需要昂贵的复制代价。
从形式上看,RDD是一个分区的只读记录的集合。RDD只能通过在(1)稳定的存储器或(2)其他RDD的数据上的确定性操作来创建。我们把这些操作称作变换以区别其他类型的操作。例如 map, filter, 和 join。2
RDD 在任何时候都不需要被"物化"(进行实际的变换并最终写入稳定的存储器上)。实际上,一个 RDD 有足够的信息描述着其如何从其他稳定的存储器上的数据生成。它有一个强大的特性:从本质上说,若 RDD 失效且不能重建,程序将不能引用该 RDD。
最后,用户可以控制 RDD 的其他两个方面:持久化和分区。用户可以选择重用哪个 RDD,并为其制定存储策略(比如, 内存存储)。也可以让 RDD 中的数据根据记录的 key 分布到集群的多个机器。这对位置优化来说是有用的,比如可用来保证两个要Jion的数据集都使用了相同的哈希分区方式。
Spark直到RDD第一次调用一个动作时才真正计算RDD。这也就使得Spark可以按序缓存多个变换。
我们可以将 transformations 操作理解成一种惰性操作,它只是定义了一个新的 RDD,而不是立即计算它。相反,actions 操作则是立即计算,并返回结果给程序,或者将结果写入到外存储中。
抽象RDD:
简而言之,我们提供了一个通用接口来抽象每个 RDD,并提供 5 种信息:一组分区,他们是数据集的最小分片;一组 依赖关系,指向其父 RDD;一个函数,基于父 RDD 进行计算;以及划分策略和数据位置的元数据。
在设计接口的过程中,最有趣的问题在于如何表示 RDD 之间的依赖关系。我们发现,比较合理的方式是将依赖关系分成两类:窄依赖:每个父 RDD 的分区都至多被一个子 RDD 的分区使用;宽依赖:多个子 RDD 的分区依赖一个父 RDD 的分区。例如,map 操作是一种窄依赖,而 join操作是一种宽依赖(除非父 RDD 已经基于 Hash 策略被划分过了)。
一些RDD实现的概念:
HDFS 文件:在我们的例子中,HDFS 文件作为输入 RDD。对于这些 RDD,partitions 代表文件中每个文件块的分区(包含文件块在每个分区对象中的偏移量),preferredLocations 表示文件块所在的节点,而 iterator 读取这些文件块。
map:在任何一个RDD上调用map操作将返回一个MappedRDD对象。这个对象与其父对象具有相同的分区以及首选地点(preferredLocations),但在其迭代方法(iterator)中,传递给 map的函数会应用到父对象记录。
union:在两个RDD上调用union操作将返回一个RDD,这个RDD的分区为原始两个RDD的父RDD的分区进行union后的结果。每个子分区都是通过窄依赖于同一个父级分区计算出来的。7
sample:抽样类似于映射。不同之处在于,RDD 会为每一个分区保存一个生成随机数的种子来对确定如何对父级记录进行抽样。
join:连接两个 RDD 可能会产生两个窄依赖,或两个宽依赖,或一个窄依赖和一个宽依赖。如果两个 RDD 都是基于相同的 Hash/范围划分策略,那么就会产生窄依赖;如果一个父 RDD 具有某种划分策略而另一个不具有,则会同时产生窄依赖和宽依赖。无论哪种情况,结果 RDD 都具有一个划分策略(要么继承自父 RDD,要么是一个默认的 Hash 划分策略)。
作业调度:
总的来说,我们的调度器与 Dryad 的[61]类似,但它额外会考虑被持久化(persist)的 RDD的那个分区保存在内存中并可供使用。当用户对一个 RDD 执行 Action(如 count 或 save)操作时, 调度器会根据该 RDD 的 lineage,来构建一个由若干 阶段(stage) 组成的一个 DAG(有向无环图)以执行程序,正如 2.5 所示。 每个 stage 都包含尽可能多的连续的窄依赖型转换。各个阶段之间的分界则是宽依赖所需的 shuffle 操作,或者是 DAG 中一个经由该分区能更快到达父 RDD 的已计算分区。之后,调度器运行多个任务来计算各个阶段所缺失的分区,直到最终得出目标 RDD。
调度器向各机器的任务分配采用延时调度机制[117]并根据数据存储位置(本地性)来确定。若一个任务需要处理的某个分区刚好存储在某个节点的内存中,则该任务会分配给那个节点。否则,如果一个任务处理的某个分区,该分区含有的 RDD 提供较佳的位置(例如,一个 HDFS 文件),我们把该任务分配到这些位置。
"对应宽依赖类的操作{比如w shuffle依赖),我们会将中间记录物理化到保存父分区的节点上。这和 MapReduce 物化 Map 的输出类似,能简化数据的故障恢复过程。"
针对调度器器自身失败的容错,拷贝相应 RDD 的 lineage 是比较直接的解决之道。但现阶段我们并不提供该类容错特性。
若某个任务执行缓慢 (即"落后者"straggler),系统则会在其他节点上执行该任务的拷贝这与 MapReduce 做法类似,并取最先得到的结果作为最终的结果。
scala解析器
Scala 解析器通常会为用户输入的每一行生成一个类,把它导入 JVM ,调用上面的一个函数。Scala 解析器的解析通常有如下组成:
将用户输入的每一行编译出其所对应的一个类;
将该类载入到 JVM 中;
调用该类的某个函数。这个类包含一个单例对象,对象中包含当前行的变量或函数,在初始化方法中包含运行该行的代码。例如,如果用户键入 var x = 5,换一行再键入 println(x),那解析器会定义一个叫 Line1 的类,该类包含 x。第二行编译成println(Line1.getInstance().x)。
内存管理
Spark提供了三种对持久化RDD的存储策略:未序列化Java对象存于内存中、序列化后的数据存于内存及磁盘存储。第一个选项的性能表现是最优秀的,因为可以直接访问在JAVA虚拟机内存里的RDD对象。在空间有限的情况下,第二种方式可以让用户采用比JAVA对象图更有效的内存组织方式,代价是降低了性能。8第三种策略适用于RDD太大难以存储在内存的情形,但每次重新计算该RDD会带来额外的资源开销。
对于有限可用内存,我们使用以 RDD 为对象的 LRU 回收算法来进行管理。当计算得到一个新的 RDD 分区,但却没有足够空间来存储它时,系统会从最近最少使用的 RDD 中回收其一个分区的空间。除非该 RDD 便是新分区对应的 RDD,这种情况下,Spark 会将旧的分区继续保留在内存,防止同一个 RDD 的分区被循环调入调出。这点很关键--因为大部分的操作会在一个 RDD 的所有分区上进行,那么很有可能已经存在内存中的分区将会被再次使用。到目前为止,这种默认的策略在我们所有的应用中都运行很好,当然我们也为用户提供了“持久化优先级”选项来控制 RDD 的存储。