RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据(计算)抽象。代码中是一个抽象类,它代表一个不可变、可分区、里面的元素可并行计算的集合。
RDD的属性
- 一组分区(Partition),即数据集的基本组成单位;
- 一个计算每个分区的函数;
- RDD之间的依赖关系;
- 一个Partitioner,即RDD的分片函数;
- 一个列表,存储存取每个Partition的优先位置(preferred location)。
RDD特点
RDD表示只读的分区的数据集,对RDD进行改动,只能通过RDD的转换操作,由一个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必需的信息。RDDs之间存在依赖,RDD的执行是按照血缘关系延时计算的。如果血缘关系较长,可以通过持久化RDD来切断血缘关系。
分区
RDD逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个compute函数得到每个分区的数据。如果RDD是通过已有的文件系统构建,则compute函数是读取指定文件系统中的数据,如果RDD是通过其他RDD转换而来,则compute函数是执行转换逻辑将其他RDD的数据进行转换。
只读
如下图所示,RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD。
由一个RDD转换到另一个RDD,可以通过丰富的操作算子实现,不再像MapReduce那样只能写map和reduce了,如下图所示。
RDD的操作算子包括两类,一类叫做transformations,它是用来将RDD进行转化,构建RDD的血缘关系;另一类叫做actions,它是用来触发RDD的计算,得到RDD的相关计算结果或者将RDD保存的文件系统中
依赖
RDDs通过操作算子进行转换,转换得到的新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系,也称之为依赖。如下图所示,依赖包括两种,一种是窄依赖,RDDs之间分区是一一对应的,另一种是宽依赖,下游RDD的每个分区与上游RDD(也称之为父RDD)的每个分区都有关,是多对多的关系
缓存
如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用
如下图所示,RDD-1经过一系列的转换后得到RDD-n并保存到hdfs,RDD-1在这一过程中会有个中间结果,如果将其缓存到内存,那么在随后的RDD-1转换到RDD-m这一过程中,就不会计算其之前的RDD-0了。
CheckPoint
虽然RDD的血缘关系天然地可以实现容错,当RDD的某个分区数据失败或丢失,可以通过血缘关系重建。但是对于长时间迭代型应用来说,随着迭代的进行,RDDs之间的血缘关系会越来越长,一旦在后续迭代过程中出错,则需要通过非常长的血缘关系去重建,势必影响性能。为此,RDD支持checkpoint将数据保存到持久化的存储中,这样就可以切断之前的血缘关系,因为checkpoint后的RDD不需要知道它的父RDDs了,它可以从checkpoint处拿到数据。
RDD编程
编程模型
在Spark中,RDD被表示为对象,通过对象上的方法调用来对RDD进行转换。经过一系列的transformations定义RDD之后,就可以调用actions触发RDD的计算,action可以是向应用程序返回结果(count, collect等),或者是向存储系统保存数据(saveAsTextFile等)。在Spark中,只有遇到action,才会执行RDD的计算(即延迟计算),这样在运行时可以通过管道的方式传输多个转换。
要使用Spark,开发者需要编写一个Driver程序,它被提交到集群以调度运行Worker,如下图所示。Driver中定义了一个或多个RDD,并调用RDD上的action,Worker则执行RDD分区计算任务
RDD的创建
在Spark中创建RDD的创建方式可以分为三种:从集合中创建RDD;从外部存储创建RDD;从其他RDD创建
从集合中创建
从集合中创建RDD,Spark主要提供了两种函数:parallelize和makeRDD
1)使用parallelize()从集合创建
scala> val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
2)使用makeRDD()从集合创建
scala> val rdd1 = sc.makeRDD(Array(1,2,3,4,5,6,7,8))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24
由外部存储系统的数据集创建
包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等,我们会在第4章详细介绍。
scala> val rdd2= sc.textFile("hdfs://hadoop102:9000/RELEASE")
rdd2: org.apache.spark.rdd.RDD[String] = hdfs:// hadoop102:9000/RELEASE MapPartitionsRDD[4] at textFile at <console>:24
RDD整体上分为Value类型和Key-Value类型
Value类型
map(func)案例
- 作用:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
- 需求:创建一个1-10数组的RDD,将所有元素*2形成新的RDD
(1)创建
scala> var source = sc.parallelize(1 to 10)
source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24
(2)打印
scala> source.collect()
res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
(3)将所有元素*2
scala> val mapadd = source.map(_ * 2)
mapadd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at map at <console>:26
(4)打印最终结果
scala> mapadd.collect()
res8: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
object Spark_RDD {
def main(args: Array[String]): Unit = {
val config = new SparkConf().setMaster("local[*]").setAppName("Spark_RDD").set("spark.testing.memory","2147480000")
//创建spark上下文对象
val sc = new SparkContext(config)
//创建rdd,从内存中创建makeRDD,底层就是parallelize
val listRDD: RDD[Int] = sc.makeRDD(List(1,2,5,4))
listRDD.collect().foreach(println)
//从内存中创建parallelize
val arrayRDD: RDD[Int] = sc.parallelize(Array(1,2,3,4))
arrayRDD.collect().foreach(println)
//从外部存储创建,默认项目路径,也可以改为hdfs路径hdfs://hadoop102:9000/xxx
//读取文件时,传递的参数为最小分区数,但是不一定是这个分区,取决于hadoop分片规则
val fileRDD: RDD[String] = sc.textFile("in",2)
fileRDD.foreach(println)
//将内存创建RDD数据保存到文件,8核,四个数据放八个分区
// listRDD.saveAsTextFile("output")
fileRDD.saveAsTextFile("output")
}
}
map
map(func)案例
- 作用:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
- 需求:创建一个1-10数组的RDD,将所有元素*2形成新的RDD
object Map {
def main(args: Array[String]): Unit = {
val config = new SparkConf().setMaster("local[*]").setAppName("Map").set("spark.testing.memory","2147480000")
//创建spark上下文对象
val sc = new SparkContext(config)
/*创建一个1-10数组的RDD,将所有元素*2形成新的RDD*/
//map操作在Executor,其他的操作在driver,executor如果要用到driver中的数据需要支持序列化
val listRdd: RDD[Int] = sc.makeRDD(1 to 10)
val mapRDD: RDD[Int] = listRdd.map(x => x*2)
/*拼接*/
val data = Array(1, 2, 3, 4, 5)
sc.makeRDD(data).map(_ + "test").collect().foreach(println)
mapRDD.collect().foreach(println)
}
}
mapPartitions(func) 案例
- 作用:类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区。
- 需求:创建一个RDD,使每个元素*2组成新的RDD
object MapPartitions {
def main(args: Array[String]): Unit = {
val config = new SparkConf().setMaster("local[*]").setAppName("MapPartitions").set("spark.testing.memory","2147480000")
//创建spark上下文对象
val sc = new SparkContext(config)
/*创建一个1-10数组的RDD*/
val listRdd: RDD[Int] = sc.makeRDD(1 to 10)
//mappartition对一个rdd中所有的分区进行遍历
//优于map算子,减少发到执行器的交互次数
//可能内存溢出
val partitions: RDD[Int] = listRdd.mapPartitions(data=>{data.map(data=>data*2)})
partitions.collect.foreach(println)
}
}
flatMap(func) 案例
- 作用:类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
- 需求:创建一个元素为1-5的RDD,运用flatMap创建一个新的RDD,新的RDD为原RDD的每个元素的2倍(2,4,6,8,10)
object FlatMap {
def main(args: Array[String]): Unit = {
val config = new SparkConf().setMaster("local[*]").setAppName("FlatMap").set("spark.testing.memory","2147480000")
//创建spark上下文对象
val sc = new SparkContext(config)
//(Array(1,2),(Array(2,3)===>1,2,2,3
val listRdd: RDD[Array[Int]] = sc.makeRDD(List(Array(1,2),(Array(2,3))))
val mapInfo: RDD[Int] = listRdd.flatMap(datas=>datas)
mapInfo.collect().foreach(println)
}
}
map()和mapPartition()的区别
- map():每次处理一条数据。
- mapPartition():每次处理一个分区的数据,这个分区的数据处理完后,原RDD中分区的数据才能释放,可能导致OOM。
- 开发指导:当内存空间较大的时候建议使用mapPartition(),以提高处理效率
glom案例
- 作用:将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]
- 需求:创建一个4个分区的RDD,并将每个分区的数据放到一个数组
object Glom {
def main(args: Array[String]): Unit = {
val config = new SparkConf().setMaster("local[*]").setAppName("Glom").set("spark.testing.memory","2147480000")
//创建spark上下文对象
val sc = new SparkContext(config)
//glom算子,分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。
val makeRDD: RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6,7,8),3)
val glomRDD: RDD[Array[Int]] = makeRDD.glom()
glomRDD.collect().foreach(array=>{
println(array)
// println(array.mkString(","))
})
}
}
groupBy(func)案例
- 作用:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。
- 需求:创建一个RDD,按照元素模以2的值进行分组。
object GroupBy {
def main(args: Array[String]): Unit = {
val config = new SparkConf().setMaster("local[*]").setAppName("GroupBy").set("spark.testing.memory", "2147480000")
//创建spark上下文对象
val sc = new SparkContext(config)
//按照制定规则进行分组,分组后的数据形成元组,kv
val groupRDD: RDD[Int] = sc.makeRDD(List(1,2,3,4))
val groupByRDD: RDD[(Int, Iterable[Int])] = groupRDD.groupBy(_%2)
groupByRDD.collect().foreach(println)
}
}
(0,CompactBuffer(2, 4))
(1,CompactBuffer(1, 3))
- 作用:过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成。
- 需求:创建一个RDD(由字符串组成),过滤出一个新RDD(包含”xiao”子串)
object Filter {
def main(args: Array[String]): Unit = {
val config = new SparkConf().setMaster("local[*]").setAppName("Filter").set("spark.testing.memory", "2147480000")
//创建spark上下文对象
val sc = new SparkContext(config)
//按照制定规则进行分组,分组后的数据形成元组,kv
val fil: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val filterRDD: RDD[Int] = fil.filter(_%2==1)
filterRDD.collect().foreach(println)
}
}
sample(withReplacement, fraction, seed) 案例
- 作用:以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子。
object Sample {
def main(args: Array[String]): Unit = {
val config = new SparkConf().setMaster("local[*]").setAppName("Sample").set("spark.testing.memory", "2147480000")
//创建spark上下文对象
val sc = new SparkContext(config)
val rdd: RDD[String] = sc.makeRDD(Array("hello1","hello1","hello2","hello3","hello4","hello5","hello6","hello1","hello1","hello2","hello3"))
val sampleRDD: RDD[String] = rdd.sample(false,0.7)
sampleRDD.foreach(println)
}
}
coalesce(numPartitions) 案例
- 作用:缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。
object Coalesce {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Coalesce").set("spark.testing.memory", "2147480000")
val sc = new SparkContext(conf)
val makeRdd: RDD[Int] = sc.makeRDD(1 to 16,4)
//缩减分区数
val coalesceRdd: RDD[Int] = makeRdd.coalesce(3)
coalesceRdd.saveAsTextFile("output")
println(coalesceRdd.partitions.size)
}
}
repartition(numPartitions) 案例
- 作用:根据分区数,重新通过网络随机洗牌所有数据。
- 需求:创建一个4个分区的RDD,对其重新分区
object Repartition {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Distinct").set("spark.testing.memory", "2147480000")
val sc = new SparkContext(conf)
val parallelize: RDD[Int] = sc.parallelize(1 to 16,4)
println(parallelize.partitions.size)
val rerdd = parallelize.repartition(2)
println(rerdd.partitions.size)
val glom: RDD[Array[Int]] = rerdd.glom()
glom.collect().foreach(array=>{
println(array.mkString(","))
})
}
}
coalesce和repartition的区别
- coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。
- repartition实际上是调用的coalesce,默认是进行shuffle的。源码如下:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true)}
aggregateByKey案例
参数:(zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U)
- 作用:在kv对的RDD中,,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。
- 参数描述:
(1)zeroValue:给每一个分区中的每一个key一个初始值;
(2)seqOp:函数用于在每一个分区中用初始值逐步迭代value;
(3)combOp:函数用于合并每个分区中的结果。 - 需求:创建一个pairRDD,取出每个分区相同key对应值的最大值,然后相加
-
需求分析
image.png
object AggregateByKey {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Coalesce").set("spark.testing.memory", "2147480000")
val sc = new SparkContext(conf)
//aggregateByKeyRDD算子
val rdd = sc.parallelize(List(("a",3),("c",4),("b",3),("c",6),("c",8),("a",2)),2)
val aggregateByKeyRDD: RDD[(String, Int)] = rdd.aggregateByKey(0)(math.max(_,_),_+_)
aggregateByKeyRDD.collect().foreach(println)
}
}