引子
任何一个概念的引入都是为了解决某种问题,RDD亦然。关于RDD这个概念,先抛几个问题。
为什么引入RDD这个概念?
RDD到底是什么?
RDD是怎么实现的?
下面我们逐步阐述一下这三个问题。
part1:为什么要引入RDD?
想回答这个问题,最好的办法就是读原论文里面的论述,可以通过下面的链接下载阅读。
https://www.usenix.org/system/files/conference/nsdi12/nsdi12-final138.pdf
其中开篇首先讲述了为什么要设计RDD这样的一个模型,
Although current frameworks provide numerous abstractions for accessing a cluster’s computational resources, they lack abstractions for leveraging distributed memory.This makes them inefficient for an important class of emerging applications: those that reuse intermediate results across multiple computations.
Data reuse is common in many iterative machine learning and graph algorithms, including PageRank, K-means clustering, and logistic regression. Another compelling use case is interactive data mining, where a user runs multiple adhoc queries on the same subset of the data.
Unfortunately, in most current frameworks, the only way to reuse data between computations (e.g., between two MapReduce jobs) is to write it to an external stable storage system, e.g., a distributed file system. This incurs substantial overheads due to data replication, disk I/O, and serialization, which can dominate application execution times.
从文中的描述可以看出,要解决一个核心问题,那就是目前的框架(MapReduce和Dryad)都缺乏利用分布式内存的抽象能力。对于那些需要在多个计算中复用中间结果类型的应用,不能很好地使用内存,而是要多次读写到磁盘中,性能普遍不佳。
当前的分布式编程框架,都是基于非循环的数据流模型。即从稳定的物理存储(如分布式文件系统)中加载记录,记录被传入由一组稳定性操作构成的DAG(Directed Acyclic Graph,有向无环图),然后写回稳定存储。需要将数据输出到磁盘,然后在每次查询时重新加载,这会带来较大的开销。RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。
其次,基于共享内存的抽象,都没有很好地解决容错性的问题,当一个数据块出错了之后,他们设计的方法就是在节点间复制数据来记录数据的更新。
Existing abstractions for in-memory storage on clusters, such as distributed shared memory [24], key value stores [25], databases, and Piccolo [27], offer an interface based on fine-grained updates to mutable state (e.g., cells in a table). With this interface, the only ways to provide fault tolerance are to replicate the data across machines or to log updates across machines. Both approaches are expensive for data-intensive workloads, as they require copying large amounts of data over the cluster network, whose bandwidth is far lower than that of RAM, and they incur substantial storage overhead.
RDD通过基于数据集粗粒度转换、线性血脉关系的管理,很好地解决了容错性问题,通过依赖关系还原现场重建数据块,而不是直接复制数据。
part2:RDD是什么?
2.1 基本概念
我们看看论文里面怎么说:
In this paper, we propose a new abstraction called resilient distributed datasets (RDDs) that enables efficient data reuse in a broad range of applications. RDDs are fault-tolerant, parallel data structures that let users explicitly persist intermediate results in memory, control their partitioning to optimize data placement, and manipulate them using a rich set of operators.
第一强调的高效的复用;
第二是容错、并行、显式保存中间结果;
第三通过控制分区优化数据存储的位置;
第四是提供丰富的算子来操作这些分区。
大家经常看到的概念描述是这样的:弹性分布式数据集(RDD,Resilient Distributed Datasets)提供了一种高度受限的共享内存模型,即RDD是只读的、分区记录的集合,只能通过在其他RDD执行确定的转换操作(如map、join和group by)而创建,然而这些限制使得实现容错的开销很低。
2.2 关于容错
设计RDD的一个主要挑战就是容错设计,Matei Zaharia在论文里面说到:
The main challenge in designing RDDs is defining a programming interface that can provide fault tolerance efficiently.
RDD如何进行容错设计呢?
首先看看一般的分布式共享内存系统是如何进行容错设计的:
分布式共享内存系统:高昂代价的检查点和回滚机制(IO和存储)
RDD的容错机制采用了一种完全不同的方案
RDD通过Lineage来重建丢失的分区:也称为记录更新,一个RDD中包含了如何从其他RDD衍生所必需的相关信息(血统),更新的方式是粗粒度变换(coarse-grained transformation),即仅记录在单个块上执行的单个操作,然后创建某个RDD的变换序列存储下来,当数据丢失时,我们可以用变换序列(血统)来重新计算,恢复丢失的数据,以达到容错的目的。
其实,对于lineage过长的情况,重建分区的代价也是很大的,这个时候需要通过建立checkpoint的方式持久化一部分中间结果。所以对于历史的方案,并没有完全抛弃。
2.3 每个RDD有5个主要的属性
•一组partitions(分片,可扩展性)
•计算每个分片的函数(transformation,action)
•RDD间的依赖关系(自动容错)
•选择项:一个K-V RDD的partitioner(自定义分区函数,一般是使用hash函数)
•选择项:存储每个partition的优先的(preferred)位置(本地优化分配)
2.3.1 RDD要素之一:partition
问题
1.partition的数量是如何决定的?
2. 多少个partition合适?
3.partition是如何分配到各个executor的?
partition的数量是如何决定的?
默认情况
1、默认情况下,当Spark从HDFS读一个文件的时候,会为一个输入的片段创建一个分区,也就是一个HDFS split对应一个RDD partition, 大小是64MB或者128MB,这个过程是自动完成的,不需要人工干预,但是这个分区之间的split是基于行的分割而不是按照块分割的。
2、使用默认读取文件命令时,分区数目可能会少,一般情况下是HDFS分区数目,如果文件一行长度很长(大于block大小),分区数会变少。
3、自定义分区数目只对非压缩文件生效,对于压缩文件,一个文件只能有一个分区,这个时候只能使用repartition修改分区数目了。
手动改变分区数目的操作
用户可以在读数据的时候可以自定义分区数目,并在后续的transformation和action中体现。
sc.textFile(path,partition)
repartition Transformation
repartition(numPartitions:Int)(implicitord: Ordering[T] = null): RDD[T]
Repartition is coalesce withnumPartitionsand shuffle enabled.
coalesce Transformation
coalesce(numPartitions:Int, shuffle: Boolean = false)(implicitord:
Ordering[T] = null): RDD[T]
The coalesce transformation is used to change the number of partitions. It can
trigger RDD shuffling depending on the shuffle flag (disabled by default, i.e.
false).
获取分区信息
def getPartitions: Array[Partition]
多少个partition合适?
每个executor分配的Task的数量和executor分配的CPU数量一致,而Task数量和分区数目一致。所以要平衡分区数目和申请的CPU资源。
一般情况下,分区文件小会导致分区数目增加,可以分配到更多的节点上进行计算,这样会提升速度;分区过大则分区数目减少,如果分区数目远少于分配的CPU数目,那么很多CPU就会空闲,速度会很慢。
Spark对每个RDD分区只能运行一个并行任务,最多同时运行任务数就是集群的CPU核心总数,总体讲建议一个CPU最多可以分配2-3个任务。所以总的分区数目理想数字也应该是分配的CPU的2-3倍之间。
分区的最大大小由executor的可用内存决定,如果分区过大,多个大的分区被分配到同一个executor中,超出了shuffle内存,则会出现内存溢出。
partition是如何分配到各个executor的?
首先引用一个Quora的答案
RDD is a dataset which is distributed, that is, it is divided into"partitions". Each of these partitions can be present in the memory or disk of different machines. If you want Spark to process the RDD, then Spark needs to launch one task per partition of the RDD. It’s best that each task be sent to the machine have the partition that task is supposed to process. In that case, the task will be able to read the data of the partition from the local machine. Otherwise, the task would have to pull the partition data over the network from a different machine, which is less efficient. This scheduling of tasks (that is, allocation of tasks to machines) such that the tasks can read data “locally” is known as “locality aware scheduling”.
分区任务分配
我们基本上都了解,计算和数据的本地化是分布式计算的一个重要思想,当数据和运算分离的时候就需要从其他节点拉数据,这个是要消耗网络IO的。
在进行任务分配的时候,要以网络传输消耗最小化为原则,Spark从最近的节点将数据读到RDD中。Spark首先会评估分区情况,当任务分配完毕后,会有若干个executor,而分区在若干个worker上,需要综合评估网络传输的代价,将不同的分区分配到不同的executor上。
taskSetManager在分发任务之前会先计算数据本地性,优先级依次是:
PROCESS_LOCAL data is in the same JVM as the running code. This is the best locality possible
NODE_LOCAL data is on the same node. Examples might be in HDFS on the same node, or in another executor on the same node. This is a little slower than PROCESS_LOCAL because the data has to travel between processes
NO_PREF data is accessed equally quickly from anywhere and has no locality preference
RACK_LOCAL data is on the same rack of servers. Data is on a different server on the same rack so needs to be sent over the network, typically through a single switch
ANY data is elsewhere on the network and not in the same rack
•当计算完本地性之后,再分发任务。
•Driver刚启动时候,executor还没有初始化完毕,一部分任务的本地化被设置为NO_PREF,ShuffleRDD的本地性始终为NO_PREF,在任务分配时候优先分配到非本地节点。
•具体怎么实现见下次Spark计算框架一讲。
并行度
在Standalone 或者 Yarn集群模式,默认并行度等于集群中所有核心数目的总和或者 2,取两者中的较大值(见 SparkDeploySchedulerBackend.defaultParallelism()。这个backend继承自CoarseGrainedSchedulerBackend)。
override def defaultParallelism():Int= {
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(),2))
}
可以通过设置并行度来进行再分区,参数如下:
2.3.2 RDD要素之二:函数
•分为两类:
•transformation
•action
输入:在Spark程序运行中,数据从外部数据空间(如分布式存储:textFile读取HDFS等,parallelize方法输入Scala集合或数据)输入Spark,数据进入Spark运行时数据空间,转化为Spark中的数据块,通过BlockManager进行管理。
运行:在Spark数据输入形成RDD后便可以通过变换算子,如filter等,对数据进行操作并将RDD转化为新的RDD,通过Action算子,触发Spark提交作业。
如果数据需要复用,可以通过Cache算子,将数据缓存到内存。
输出:程序运行结束数据会输出Spark运行时空间,存储到分布式存储中(如saveAsTextFile输出到HDFS),或Scala数据或集合中(collect输出到Scala集合,count返回Scala int型数据)。
Transformation和Actions
Transformation
The following table lists some of the common transformations supported by Spark. Refer to the RDD API doc (Scala,Java,Python,R) and pair RDD functions doc (Scala,Java) for details.
TransformationMeaning
map(func)Return a new distributed dataset formed by passing each element of the source through a functionfunc.
filter(func)Return a new dataset formed by selecting those elements of the source on whichfuncreturns true.
flatMap(func)Similar to map, but each input item can be mapped to 0 or more output items (sofuncshould return a Seq rather than a single item).
mapPartitions(func)Similar to map, but runs separately on each partition (block) of the RDD, sofuncmust be of type Iterator => Iterator when running on an RDD of type T.
mapPartitionsWithIndex(func)Similar to mapPartitions, but also providesfuncwith an integer value representing the index of the partition, sofuncmust be of type (Int, Iterator) => Iterator when running on an RDD of type T.
sample(withReplacement,fraction,seed)Sample a fractionfractionof the data, with or without replacement, using a given random number generator seed.
union(otherDataset)Return a new dataset that contains the union of the elements in the source dataset and the argument.
intersection(otherDataset)Return a new RDD that contains the intersection of elements in the source dataset and the argument.
distinct([numTasks]))Return a new dataset that contains the distinct elements of the source dataset.
groupByKey([numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs.
Note:If you are grouping in order to perform an aggregation (such as a sum or average) over each key, usingreduceByKeyoraggregateByKeywill yield much better performance.
Note:By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optionalnumTasksargument to set a different number of tasks.
aggregateByKey(zeroValue)(seqOp,combOp, [numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like ingroupByKey, the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending], [numTasks])When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the booleanascendingargument.
join(otherDataset, [numTasks])When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported throughleftOuterJoin,rightOuterJoin, andfullOuterJoin.
cogroup(otherDataset, [numTasks])When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable, Iterable)) tuples. This operation is also calledgroupWith.
cartesian(otherDataset)When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
pipe(command,[envVars])Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.
coalesce(numPartitions)Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
repartition(numPartitions)Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
repartitionAndSortWithinPartitions(partitioner)Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than callingrepartitionand then sorting within each partition because it can push the sorting down into the shuffle machinery.
Action
The following table lists some of the common actions supported by Spark. Refer to the RDD API doc (Scala,Java,Python,R)
and pair RDD functions doc (Scala,Java) for details.
ActionMeaning
reduce(func)Aggregate the elements of the dataset using a functionfunc(which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
collect()Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
count()Return the number of elements in the dataset.
first()Return the first element of the dataset (similar to take(1)).
take(n)Return an array with the firstnelements of the dataset.
takeSample(withReplacement,num, [seed])Return an array with a random sample ofnumelements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
takeOrdered(n,[ordering])Return the firstnelements of the RDD using either their natural order or a custom comparator.
saveAsTextFile(path)Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
saveAsSequenceFile(path)
(Java and Scala)Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
saveAsObjectFile(path)
(Java and Scala)Write the elements of the dataset in a simple format using Java serialization, which can then be loaded usingSparkContext.objectFile().
countByKey()Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
foreach(func)Run a functionfuncon each element of the dataset. This is usually done for side effects such as updating anAccumulatoror interacting with external storage systems.
Note: modifying variables other than Accumulators outside of theforeach()may result in undefined behavior. SeeUnderstanding closuresfor more details.
The Spark RDD API also exposes asynchronous versions of some actions, likeforeachAsyncforforeach, which immediately return aFutureActionto the caller instead of blocking on completion of the action. This can be used to manage or wait for the asynchronous execution of the action.
2.3.3 RDD要素之三:依赖关系(自动容错)
RDD作为数据结构,本质上是一个只读的分区记录集合。
我们可以用数据库视图的概念来理解RDD(其实DataFrame更像)。
一个RDD可以包含多个分区,每个分区就是一个dataset片段;数据库的表也可以创建很多分区。
有的SQL语句只需要查询一张表,有的需要关联多张表,RDD也是一样的,有的数据只需要操作一个集合,有的需要做关联,本质上是一样的,这里就引入了依赖的概念。
有的SQL只需要一张表,比如简单过滤,有的需要多张表进行关联。
简单来讲,对RDD的操作,如果只需要一个RDD,则是窄依赖,但是多个RDD的却未必是宽依赖,这就是比较难理解的地方。
之所以实际情况要复杂一些,这和Spark的shuffle机制有关,本质上是否shuffle是区分宽窄依赖的判断条件,实际上多个RDD操作未必会发生shuffle,因为一个分区会被一个executor处理,如果两个RDD的数据都在一个分区里面,结果就直接在自己的executor内完成了,就不会发生shuffle,下面详解。
依赖的定义
先看一张图
我们可以看出依赖可以分为NarrowDependency和ShuffleDependency。
窄依赖NarrowDependency
窄依赖的基本判断原则
父RDD中的一个分区最多只会被子RDD中的一个分区使用
换句话说,父RDD中,一个分区内的数据是不能被分割的,必须整个交付给子RDD中的一个分区。因为一旦分割就涉及到shuffle,一旦涉及到shuffle,就成了宽依赖,所以宽依赖也叫shuffle依赖。
窄依赖共有两种实现,一种是一对一的依赖,即 OneToOneDependency:
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
从其 getParents 方法可以看出 OneToOneDependency 的依赖关系下,子 RDD 的 partition 仅依赖于唯一 parent RDD 的相同 index 的 partition。
另一种窄依赖的实现是 RangeDependency,它仅仅被 UnionRDD 使用,UnionRDD 把多个 RDD 合成一个 RDD,这些 RDD 是被拼接而成,其 getParents 实现如下:
override def getParents(partitionId:Int):List[Int] = {
if(partitionId >= outStart && partitionId < outStart + length) {
List(partitionId - outStart + inStart)
} else {
Nil
}
}
如下图所示:
哪些操作属于窄依赖呢?
•map
•flatMap
•filter
•union
•部分join
•sample
宽依赖ShuffleDependency
宽依赖( Shuffle 依赖)中:
父RDD中的分区被多个子RDD分区使用。
因为父RDD中一个分区内的数据会被分割,发送给子RDD的所有分区,因此Shuffle依赖也意味着父RDD 与子 RDD之间存在着Shuffle过程。
哪些操作属于宽依赖呢?
•部分join
•groupByKey
•reduceByKey
•groupWith
•cartesian
转换中的依赖情况
第一:依赖关系是两个RDD 之间的依赖。
第二:若一次转换操作中父RDD 有多个,则可能会同时包含窄依赖和 Shuffle 依赖。
右图所示的Join 操作中:
RDDa 和 RDD c 采用了相同的分区器,两个RDD 之间是窄依赖,父RDD a分区没有被分割,属于窄依赖
Rdd b 的分区器与RDD c不同,父RDD b分区被分割,因此它们之间是Shuffle依赖
窄依赖和宽依赖的作用
概念1:计算链(computing chain)
概念2:Stage
概念3:有向无环图DAG
把RDD每个分区内数据的计算当成一个并行任务,每个并行任务包含一个计算链,将一个计算链交付给一个CPU核心去执行,集群中的CPU核心一起把RDD内的所有分区计算出来。
没必要保留中间过程的情况
断链—shuffle dependency
Shuffle依赖需要所有的父RDD
所以每次使用的时候,在shuffle时候都需要父RDD,如果不保存下来,就得重新计算。
保存是更好的选择,所以计算链从shuffle处断开,划分为不同的阶段(stage),阶段之间存在shuffle依赖,不同的stage构成了DAG。
2.3.4 RDD要素之四:partitioner
HashPartitioner与RangePartitioner
由于分区器能够间接决定RDD中分区的数量和分区内部数据记录的个数,因此选择合适的分区器能够有效提高并行计算的性能
哈希分析器的实现简单,运行速度快,但其本身有一明显的缺点:由于不关心键值的分布情况,其散列到不同分区的概率会因数据而异,个别情况下会导致一部分分区分配得到的数据多,一部分则比较少。
范围分区器则在一定程度上避免这个问题,范围分区器争取将所有的分区尽可能分配得到相同多的数据,并且所有分区内数据的上界是有序的。
HashPartitioner
•功能:依据RDD中key值的hashCode的值将数据取模后得到该key值对应的下一个RDD的分区id值,支持key值为null的情况,当key为null的时候,返回0;该分区器基本上适合所有RDD数据类型的数据进行分区操作
•例如:将同一个Host的URL分配到一个节点上,直接对域名做partition是无法实现的。
RangePartitioner
•主要用于RDD的数据排序相关API中,比如sortByKey底层使用的数据分区器就是RangePartitioner分区器;该分区器的实现方式主要是通过两个步骤来实现的,
•第一步:先重整个RDD中抽取出样本数据,将样本数据排序,计算出每个分区的最大key值,形成一个Array[KEY]类型的数组变量rangeBounds;
•第二步:判断key在rangeBounds中所处的范围,给出该key值在下一个RDD中的分区id下标;该分区器要求RDD中的KEY类型必须是可以排序的。
•例如:排序,把数据按照范围分区,这样各个分区排序之后,把数据组合起来就是正确的排序结果了。
2.3.5 RDD要素之五:本地存储优化
数据本地化是影响Spark job性能的一个重要的指标。如果数据和代码在一起,计算的速度应该会加快。如果数据和代码不在一块呢?我们必须移动其中的一个,使二者在一起。通常情况下,移动代码比移动数据更加高效(代码的体积通常比较小)。Spark在进行任务调度的时候就围绕这样一个原则:数据本地化原则(数据不动,代码动的原则)。
•PROCESS_LOCAL:数据和代码在同一个JVM里。这是最理想的数据本地性。
•NODE_LOCAL:数据和代码在相同的节点上,例如在同一HDFS的存储节点上,或者在同一节点的两个不同的executor里。这种级别的数据本地性会比PROCESS_LOCAL慢一点,因为数据需要在两个进程间传递。
•NO_PREF:数据可以被从任何位置进行访问,我们都看成是一样的。
•RACK_LOCAL:数据在同一机架的不同机器上,因此数据传输需要通过网络,典型情况下需要通过一个交换机。
•ANY:数据不在同一机架的互联网上的任意位置。
•Spark优先选择使用最佳的本地性级别,但是如果在每个空闲的executor里没有未处理的数据,Spark会降低这种级别。牵涉到两点:1等待忙着的CPU空闲了重新开一个task,这样数据就不用传输了。2在远端开一个新的任务,把数据传过去。
•Spark的做法是先等待一段时间(spark.locality),如果在等待时间内忙着的CPU闲下来了,采用1,否者采用2.
Part3:RDD在底层是如何实现的?
RDD底层实现原理
第一:RDD是分布式数据集,所以各个部分存在多台机器上。
第二:每个部分的形式是Block。
第三:每个Executor启动BlockManagerSlave对象来管理所操作的Block。
第四:Block的元数据由Driver节点的BlockManagerMaster保存,BlockManagerSlave生成Block后向BlockManagerMaster注册该Block。
第五:BlockManagerMaster管理RDD与Block的关系,当RDD不再需要存储的时候,将向BlockManagerSlave发送指令删除相应的Block。
RDD的逻辑与物理架构
用户程序对RDD通过多个函数进行操作,将RDD进行转换。Block-Manager管理RDD的物理分区,每个Block就是节点上对应的一个数据块,可以存储在内存或者磁盘。
而RDD中的partition是一个逻辑数据块,对应相应的物理块Block。本质上一个RDD在代码中相当于是数据的一个元数据结构,存储着数据分区及其逻辑结构映射关系,存储着RDD之前的依赖转换关系。
数据与计算的关系