SparkRDD
1)什么是RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将数据缓存在内存中,后续的查询能够重用这些数据,这极大地提升了查询速度。
Dataset:
一个数据集合,用于存放数据的。
Distributed:
RDD中的数据是分布式存储的,可用于分布式计算。
Resilient:
RDD中的数据可以存储在内存中或者磁盘中。
2)RDD的属性
**1. A list of partitions :一个分区(Partition)列表,数据集的基本组成单位。**
对于RDD来说,每个分区都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分区个数,如果没有指定,那么就会采用默认值。(比如:读取HDFS上数据文件产生的RDD分区数跟block的个数相等)
**2.A function for computing each split :一个计算每个分区的函数。**
Spark中RDD的计算是以分区为单位的,每个RDD都会实现compute函数以达到这个目的。
**3.A list of dependencies on other RDDs:一个RDD会依赖于其他多个RDD,RDD之间的依赖关系。**
RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
**4.Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned):一个Partitioner,即RDD的分区函数(可选项)。**
当前Spark中实现了两种类型的分区函数,一个是基于哈希的HashPartitioner,另外一 个是基于范围的RangePartitioner。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数决定了parent RDD Shuffle输出时的分区数量。
**5.Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file):一个列表,存储每个Partition的优先位置(可选项)。**
对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置(spark进行任务分配的时候尽可能选择那些存有数据的worker节点来进行任务计算)。
为什么会产生RDD?
1.传统的MapReduce虽然具有自动容错、平衡负载和可拓展性的优点,但是其最大缺点是采用非循环式的数据流模型,使得在迭代计算中要进行大量的磁盘IO操作。RDD正是解决这一缺点的抽象方法。
(2) RDD是Spark提供的最重要的抽象的概念,它是一种具有容错机制的特殊集合,可以分布在集群的节点上,以函数式编程来操作集合,进行各种并行操作。可以把RDD的结果数据进行缓存,方便进行多次重用,避免重复计算。
RDD在Spark中的地位及作用
1.为什么会有Spark?
因为传统的并行计算模型无法有效的解决迭代计算(iterative)
和交互式计算(interactive)
;而Spark的使命便是解决这两个问题,这也是他存在的价值和理由。
2.Spark如何解决迭代计算?
其主要实现思想就是RDD,把所有计算的数据保存在分布式的内存中。迭代计算通常情况下都是对同一个数据集做反复的迭代计算,数据在内存中将大大提升IO操作。这也是Spark涉及的核心:内存计算
。
3.Spark如何实现交互式计算?
因为Spark是用scala语言实现的,Spark和scala能够紧密的集成,所以Spark可以完美的运用scala的解释器,使得其中的scala可以向操作本地集合对象一样轻松操作分布式数据集。
4.Spark和RDD的关系?
RDD是一种具有容错性、基于内存计算的抽象方法,RDD是Spark Core的底层核心,Spark则是这个抽象方法的实现。
创建RDD
1.由一个已经存在的Scala集合创建。
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
2.由外部存储系统的文件创建。包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等。
val rdd2 = sc.textFile("/words.txt")
3.已有的RDD经过算子转换生成新的RDD
val rdd3=rdd2.flatMap(_.split(" "))
RDD编程API
1 RDD的算子分类
Transformation(转换):根据数据集创建一个新的数据集,计算后返回一个新RDD;例如:一个rdd进行map操作后生了一个新的rdd。
Action(动作):对rdd结果计算后返回一个数值value给驱动程序;
例如:collect算子将数据集的所有元素收集完成返回给驱动程序。
2 Transformation
RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。
Action
RDD常用的算子操作
Spark Rdd的所有算子操作,请见《sparkRDD函数详解.docx》
启动spark-shell 进行测试:
spark-shell --master spark://node1:7077
创建RDD
1.由一个已经存在的Scala集合创建。
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
2.由外部存储系统的文件创建。包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等。
val rdd2 = sc.textFile("/words.txt")
3.已有的RDD经过算子转换生成新的RDD
val rdd3=rdd2.flatMap(_.split(" "))
练习1:map、filter
//通过并行化生成rdd
val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
//对rdd1里的每一个元素乘2然后排序
val rdd2 = rdd1.map(_ * 2).sortBy(x => x, true)
//过滤出大于等于5的元素
val rdd3 = rdd2.filter(_ >= 5)
//将元素以数组的方式在客户端显示
rdd3.collect
练习2:flatMap
val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))
//将rdd1里面的每一个元素先切分在压平
val rdd2 = rdd1.flatMap(_.split(" "))
rdd2.collect
练习3:交集、并集
val rdd1 = sc.parallelize(List(5, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
//求并集
val rdd3 = rdd1.union(rdd2)
//求交集
val rdd4 = rdd1.intersection(rdd2)
//去重
rdd3.distinct.collect rdd4.collect
练习4:join、groupByKey
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2))) val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//求join
val rdd3 = rdd1.join(rdd2) rdd3.collect
//求并集
val rdd4 = rdd1 union rdd2 rdd4.collect
//按key进行分组
val rdd5=rdd4.groupByKey rdd5.collect
练习5:cogroup
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2))) val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("jim", 2)))
//cogroup
val rdd3 = rdd1.cogroup(rdd2)
//注意cogroup与groupByKey的区别
rdd3.collect
练习6:reduce
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))
//reduce聚合
val rdd2 = rdd1.reduce(_ + _) rdd2.collect
练习7:reduceByKey、sortByKey
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1))) val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5))) val rdd3 = rdd1.union(rdd2)
//按key进行聚合
val rdd4 = rdd3.reduceByKey(_ + _) rdd4.collect
//按value的降序排序
val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))
rdd5.collect
练习8:repartition、coalesce
val rdd1 = sc.parallelize(1 to 10,3)
//利用repartition改变rdd1分区数
//减少分区
rdd1.repartition(2).partitions.size
//增加分区
rdd1.repartition(4).partitions.size
//利用coalesce改变rdd1分区数
//减少分区
rdd1.coalesce(2).partitions.size
注意:repartition可以增加和减少rdd中的分区数,coalesce只能减少rdd分区数,增加rdd分区数不会生效。
1.1.1 RDD的依赖关系
1RDD的依赖
RDD和它依赖的父RDD的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。
2窄依赖
窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用
总结:窄依赖我们形象的比喻为独生子女
3宽依赖
宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition
总结:宽依赖我们形象的比喻为超生
4Lineage(血统)
RDD只支持粗粒度转换,即只记录单个块上执行的单个操作。将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
RDD的缓存
Spark速度非常快的原因之一,就是在不同操作中可以在内存中持久化或者缓存数据集。当持久化某个RDD后,每一个节点都将把计算分区结果保存在内存中,对此RDD或衍生出的RDD进行的其他动作中重用。这使得后续的动作变得更加迅速。RDD相关的持久化和缓存,是Spark最重要的特征之一。可以说,缓存是Spark构建迭代式算法和快速交互式查询的关键。
RDD缓存方式
RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。
缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
DAG的生成
什么是DAG
DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就形成了DAG,根据RDD之间依赖关系的不同将DAG划分成不同的Stage(调度阶段)。对于窄依赖,partition的转换处理在一个Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。
Spark任务调度
1任务调度流程图
各个RDD之间存在着依赖关系,这些依赖关系就形成有向无环图DAG,
DAGScheduler对这些依赖关系形成的DAG进行Stage划分,划分的规则很简单,从后往前回溯,遇到窄依赖加入本stage,遇见宽依赖进行stage切分。完成了Stage的划分。DAGScheduler基于每个Stage生成TaskSet,并将TaskSet提交给TaskScheduler。TaskSchduler 负责具体的task调度,最后在Worker节点上启动task。
2 DAGScheduler
1.DAGScheduler对DAG有向无环图进行Stage划分。
2.记录哪个RDD或者 Stage 输出被物化(缓存),通常在一个复杂的shuffle之后,通常物化一下(cache、persist),方便之后的计算。
3.重新提交shuffle输出丢失的stage(stage内部计算出错)给TaskScheduler
4.将 Taskset 传给底层调度器
a.– spark-cluster TaskScheduler
b.– yarn-cluster YarnClusterScheduler
c.– yarn-client YarnClientClusterScheduler
3TaskScheduler
(1)为每一个TaskSet构建一个TaskSetManager 实例管理这个TaskSet 的生命周期
(2)数据本地性决定每个Task最佳位置
(3)提交 taskset( 一组task) 到集群运行并监控
(4)推测执行,碰到计算缓慢任务需要放到别的节点上重试
(5)重新提交Shuffle输出丢失的Stage给DAGScheduler
问题:哪些 RDD 需要 cache?
会被重复使用的(但不能太大)。
问题:用户怎么设定哪些 RDD 要 cache?
因为用户只与 driver program 打交道,因此只能用 rdd.cache() 去 cache 用户能看到的 RDD。所谓能看到指的是调用 transformation() 后生成的 RDD,而某些在 transformation() 中 Spark 自己生成的 RDD 是不能被用户直接 cache 的,比如 reduceByKey() 中会生成的 ShuffledRDD、MapPartitionsRDD 是不能被用户直接 cache 的。
问题:哪些 RDD 需要 cache?
会被重复使用的(但不能太大)。
问题:用户怎么设定哪些 RDD 要 cache?
因为用户只与 driver program 打交道,因此只能用 rdd.cache() 去 cache 用户能看到的 RDD。所谓能看到指的是调用 transformation() 后生成的 RDD,而某些在 transformation() 中 Spark 自己生成的 RDD 是不能被用户直接 cache 的,比如 reduceByKey() 中会生成的 ShuffledRDD、MapPartitionsRDD 是不能被用户直接 cache 的。
问题:driver program 设定 rdd.cache() 后,系统怎么对 RDD 进行 cache?
先不看实现,自己来想象一下如何完成 cache:当 task 计算得到 RDD 的某个 partition 的第一个 record 后,就去判断该 RDD 是否要被 cache,如果要被 cache 的话,将这个 record 及后续计算的到的 records 直接丢给本地 blockManager 的 memoryStore,如果 memoryStore 存不下就交给 diskStore 存放到磁盘。
实际实现与设想的基本类似,区别在于:将要计算 RDD partition 的时候(而不是已经计算得到第一个 record 的时候)就去判断 partition 要不要被 cache。如果要被 cache 的话,先将 partition 计算出来,然后 cache 到内存。cache 只使用 memory,写磁盘的话那就叫 checkpoint 了。
调用 rdd.cache() 后, rdd 就变成 persistRDD 了,其 StorageLevel 为 MEMORY_ONLY。persistRDD 会告知 driver 说自己是需要被 persist 的。
如果用代码表示:
rdd.iterator()
=> SparkEnv.get.cacheManager.getOrCompute(thisRDD, split, context, storageLevel)
=> key = RDDBlockId(rdd.id, split.index)
=> blockManager.get(key)
=> computedValues = rdd.computeOrReadCheckpoint(split, context)
if (isCheckpointed) firstParent[T].iterator(split, context)
else compute(split, context)
=> elements = new ArrayBuffer[Any]
=> elements ++= computedValues
=> updatedBlocks = blockManager.put(key, elements, tellMaster = true)
当 rdd.iterator() 被调用的时候,也就是要计算该 rdd 中某个 partition 的时候,会先去 cacheManager 那里领取一个 blockId,表明是要存哪个 RDD 的哪个 partition,这个 blockId 类型是 RDDBlockId(memoryStore 里面可能还存放有 task 的 result 等数据,因此 blockId 的类型是用来区分不同的数据)。然后去 blockManager 里面查看该 partition 是不是已经被 checkpoint 了,如果是,表明以前运行过该 task,那就不用计算该 partition 了,直接从 checkpoint 中读取该 partition 的所有 records 放到叫做 elements 的 ArrayBuffer 里面。如果没有被 checkpoint 过,先将 partition 计算出来,然后将其所有 records 放到 elements 里面。最后将 elements 交给 blockManager 进行 cache。
blockManager 将 elements(也就是 partition) 存放到 memoryStore 管理的 LinkedHashMap[BlockId, Entry] 里面。如果 partition 大于 memoryStore 的存储极限(默认是 60% 的 heap),那么直接返回说存不下。如果剩余空间也许能放下,会先 drop 掉一些早先被 cached 的 RDD 的 partition,为新来的 partition 腾地方,如果腾出的地方够,就把新来的 partition 放到 LinkedHashMap 里面,腾不出就返回说存不下。注意 drop 的时候不会去 drop 与新来的 partition 同属于一个 RDD 的 partition。drop 的时候先 drop 最早被 cache 的 partition。(说好的 LRU 替换算法呢?)
问题:cached RDD 怎么被读取?
下次计算(一般是同一 application 的下一个 job 计算)时如果用到 cached RDD,task 会直接去 blockManager 的 memoryStore 中读取。具体地讲,当要计算某个 rdd 中的 partition 时候(通过调用 rdd.iterator())会先去 blockManager 里面查找是否已经被 cache 了,如果 partition 被 cache 在本地,就直接使用 blockManager.getLocal() 去本地 memoryStore 里读取。如果该 partition 被其他节点上 blockManager cache 了,会通过 blockManager.getRemote() 去其他节点上读取,读取过程如下图。
获取 cached partitions 的存储位置:partition 被 cache 后所在节点上的 blockManager 会通知 driver 上的 blockMangerMasterActor 说某 rdd 的 partition 已经被我 cache 了,这个信息会存储在 blockMangerMasterActor 的 blockLocations: HashMap中。等到 task 执行需要 cached rdd 的时候,会调用 blockManagerMaster 的 getLocations(blockId) 去询问某 partition 的存储位置,这个询问信息会发到 driver 那里,driver 查询 blockLocations 获得位置信息并将信息送回。
读取其他节点上的 cached partition:task 得到 cached partition 的位置信息后,将 GetBlock(blockId) 的请求通过 connectionManager 发送到目标节点。目标节点收到请求后从本地 blockManager 那里的 memoryStore 读取 cached partition,最后发送回来。
如何理解Spark中的血统概念(RDD)(笔试重点)
RDD在Lineage依赖方面分为两种Narrow Dependencies与Wide Dependencies用来解决数据容错时的高效性以及划分任务时候起到重要作用。
(★熟练)spark Core提供了几种方式创建RDD?
答:三种,
1、使用程序中的集合创建RDD;
2、使用本地文件创建RDD;
3、使用HDFS文件创建RDD。
三种创建RDD的方式的应用场景是什么?
答:1、使用程序中的集合创建RDD,主要用于进行测试,可以在实际部署集群运行之前,自己使用集合构造测试数据,来测试后面的spark应用的流程。
2、使用本地文件创建RDD,主要用于临时性地处理一些存储了大量数据的文件。
3、使用HDFS文件创建RDD,应该是最常用的生产环境处理方式,主要可以针对HDFS上存储的大数据,进行离线批处理操作。
(★★熟练)在通过并行化集合创建RDD的时候,我们应该如何自己指定将集合切分成多少个partition呢?
答:如果要通过并行化解集合来创建RDD,需要针对程序中的集合,调用Spark Context的parallelize()方法,在用这个parallelize()这个方法的时候,有一个重要的参数可以指定,就是将集合切分成多少个partition。
spark会为每一个partition运行一个task来进行处理。spark的官方建议是,为集群中的每个CPU创建2~~4个partition。spark默认会根据集群的情况来设置partition的数据,但是也可以通过调用parallelize()方法来实现,比如parallelize(arr,10)
(★熟练)在使用本地文件和HDFS创建RDD的时候,除了text File()方法以外还有什么方法?举例说明!并说明其用处!
1、SparkContext.wholeTestFiles()方法:作用是:可以针对目录中的大量小文件进行使用,返回的值是:<filename,fileContent>组成的pairRDD。第一个返回值是文件的名称,第二个返回值是文件的内容。
2、SparkContext.sequenceFileK,V方法,作用是:可以针对SequenceFile创建RDD,K和V泛型类型就是SequenceFile的key和value的类型。K和V要求必须是Hadoop的序列化类型,比如IntWritable,Text等。
3、SparkContext.hadoopRDD()方法,对于hadoop的自定义输入类型,可以创建RDD,该方法接受JobConf、InputFormatClass、Key和Value的Class。
4、SparkContext.objectFile()方法,可以针对之前调用RDD.saveAsObjectFile()创建的对象序列化的文件,反序列化文件中的数据,并创建一个RDD。
spark核心编程之--------操作RDD:(transformation和action案例实战)
(★)(常考)简述transformation算子和action算子的特性是什么?有什么优势?
答:Transformation的特点就是lazy(懒)特性。lazy特性指的是,如果一个Spark应用中只定义了transformation操作,那么即使你执行该应用,这些操作也不会执行。也就是说,transformation是不会触发spark程序的执行的,它们只是记录了对RDD所做的操作,但是不会自发的执行。只有当执行了Action操作,那么所有的transformation才会执行。Spark通过这个lazy特性,来进行底层的Spark应用执行的优化,避免产生过多中间结果。
action操作的执行,会触发一个spark job的执行,从而触发这个action之前所有的transformation的执行。
(★★)spark持久化的原理!(常考)
答:要持久化一个RDD,只要调用期cache()或者是persist()方法即可。在该RDD第一次被计算出来时,就会直接缓存到每个节点中。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何partition丢失了,那么Spark会自动通过其源RDD,使用transformation操作重新计算该partition。
cache()和persist()的主要区别在于,cache()是persisit()的一种简化方式,cache()底层就是调用的persisit()的无参版本,同时就是调用persisit(MEMEORY_ONLY),将数据持久化到内存中。如果需要从内存中清除缓存,那么可以使用unpersisit()方法。
Spark自己也会在shuffle操作时,进行数据的持久化,比如写入磁盘,主要是为了再节点失败时,避免需要重新计算整个过程。
cache()默认使用MEMORY_ONLY这个持久化策略。
(★)spark的cache()和persisit()方法使用的方式是什么?
答:cache缓存 : 或者persisit()的使用,是有规则的,
必须在transformation或者textfile等创建一个RDD之后,直接连续调用,cache()或者persisit()才可以
如果先创建一个RDD,然后单独另起一行执行cache()或者persist()方法,是没有用的而且,会报错,大量的文件丢失。
写出RDD持久化的常用的几种策略!并说一下使用的建议。
答:MEMORY_ONLY 级别:以非序列化的java对象的方式持久化在JVM内存中,如果内存无法完全存储RDD所有的partition,那么那些没有持久化的partition就会在下一次使用到的时候,重新被计算。
MEMORY_AND_DISK级别:同上,但是当某些partition无法存储在内存中时,会持久化到磁盘中。下次需要使用到这些partition的时候,从磁盘中读取。RY_AND_DISK级别:同上,但是当某些partition无法存储在内存中时,会持久化到磁盘中。下次需要使用到这些partition的时候,从磁盘中读取。
MEMORY_ONLY_SER级别:同MEMORY_ONLY,但是会使用java序列化方式,将java对象序列化以后持久化,可以减少内存的开销,但是在使用的时候需要反序列化,因此会增大CPU的开销
MEMORY_AND_DISK_SER级别:同MEMORY_AND_DSK。但是使用序列化方式持久化java对象。
DISK_ONLY级别:使用非序列化java对象的方式持久化,完全存储到磁盘中。
MEMORY_ONLY_2 等等:如果是尾部加2的持久化级别,表示会将持久化数据复用一份,保存到其他节点,从而在数据丢失时,不需要再次就行计算,只需要使用备份数据即可。
使用选择建议:
1、优先使用MEMORY_ONLY级别,如果可以缓存所有数据的话,那么就使用这种策略。因为纯内存的速度更快,而且没有序列化的操作,因此不需要使用CPU进行反序列化的操作。
2、如果MEMORY_ONLY无法缓存下来所有的数据的话,那么就是使用MEMORY_ONLY_SER,将数据进行序列化存储,纯内存的操作还是非常快的,只是需要消耗一些CPU。
3、如果需要进行快速的失败恢复,那么就选择后缀为_2的策略。进行数据的备份,这样失败时,就不需要在从新计算了。
4、能不使用DISK相关的策略。就不使用,有的时候,从磁盘读取还不如从新计算一遍来的更快。