SparkCore之RDD

RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据(计算)抽象。代码中是一个抽象类,它代表一个不可变、可分区、里面的元素可并行计算的集合。
RDD的属性

  1. 一组分区(Partition),即数据集的基本组成单位;
  2. 一个计算每个分区的函数;
  3. RDD之间的依赖关系;
  4. 一个Partitioner,即RDD的分片函数;
  5. 一个列表,存储存取每个Partition的优先位置(preferred location)。

RDD特点
RDD表示只读的分区的数据集,对RDD进行改动,只能通过RDD的转换操作,由一个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必需的信息。RDDs之间存在依赖,RDD的执行是按照血缘关系延时计算的。如果血缘关系较长,可以通过持久化RDD来切断血缘关系。

分区
RDD逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个compute函数得到每个分区的数据。如果RDD是通过已有的文件系统构建,则compute函数是读取指定文件系统中的数据,如果RDD是通过其他RDD转换而来,则compute函数是执行转换逻辑将其他RDD的数据进行转换。

image.png

只读
如下图所示,RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD。

image.png

由一个RDD转换到另一个RDD,可以通过丰富的操作算子实现,不再像MapReduce那样只能写map和reduce了,如下图所示。

image.png

RDD的操作算子包括两类,一类叫做transformations,它是用来将RDD进行转化,构建RDD的血缘关系;另一类叫做actions,它是用来触发RDD的计算,得到RDD的相关计算结果或者将RDD保存的文件系统中

依赖
RDDs通过操作算子进行转换,转换得到的新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系,也称之为依赖。如下图所示,依赖包括两种,一种是窄依赖,RDDs之间分区是一一对应的,另一种是宽依赖,下游RDD的每个分区与上游RDD(也称之为父RDD)的每个分区都有关,是多对多的关系

image.png

缓存
如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用

如下图所示,RDD-1经过一系列的转换后得到RDD-n并保存到hdfs,RDD-1在这一过程中会有个中间结果,如果将其缓存到内存,那么在随后的RDD-1转换到RDD-m这一过程中,就不会计算其之前的RDD-0了。


image.png

CheckPoint
虽然RDD的血缘关系天然地可以实现容错,当RDD的某个分区数据失败或丢失,可以通过血缘关系重建。但是对于长时间迭代型应用来说,随着迭代的进行,RDDs之间的血缘关系会越来越长,一旦在后续迭代过程中出错,则需要通过非常长的血缘关系去重建,势必影响性能。为此,RDD支持checkpoint将数据保存到持久化的存储中,这样就可以切断之前的血缘关系,因为checkpoint后的RDD不需要知道它的父RDDs了,它可以从checkpoint处拿到数据。

RDD编程

编程模型
在Spark中,RDD被表示为对象,通过对象上的方法调用来对RDD进行转换。经过一系列的transformations定义RDD之后,就可以调用actions触发RDD的计算,action可以是向应用程序返回结果(count, collect等),或者是向存储系统保存数据(saveAsTextFile等)。在Spark中,只有遇到action,才会执行RDD的计算(即延迟计算),这样在运行时可以通过管道的方式传输多个转换。

要使用Spark,开发者需要编写一个Driver程序,它被提交到集群以调度运行Worker,如下图所示。Driver中定义了一个或多个RDD,并调用RDD上的action,Worker则执行RDD分区计算任务

image.png
image.png

RDD的创建
在Spark中创建RDD的创建方式可以分为三种:从集合中创建RDD;从外部存储创建RDD;从其他RDD创建

从集合中创建
从集合中创建RDD,Spark主要提供了两种函数:parallelize和makeRDD
1)使用parallelize()从集合创建
scala> val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
2)使用makeRDD()从集合创建
scala> val rdd1 = sc.makeRDD(Array(1,2,3,4,5,6,7,8))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24

由外部存储系统的数据集创建
包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等,我们会在第4章详细介绍。
scala> val rdd2= sc.textFile("hdfs://hadoop102:9000/RELEASE")
rdd2: org.apache.spark.rdd.RDD[String] = hdfs:// hadoop102:9000/RELEASE MapPartitionsRDD[4] at textFile at <console>:24

RDD整体上分为Value类型和Key-Value类型
Value类型
map(func)案例

  1. 作用:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
  2. 需求:创建一个1-10数组的RDD,将所有元素*2形成新的RDD

(1)创建
scala> var source = sc.parallelize(1 to 10)
source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24
(2)打印
scala> source.collect()
res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
(3)将所有元素*2
scala> val mapadd = source.map(_ * 2)
mapadd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at map at <console>:26
(4)打印最终结果
scala> mapadd.collect()
res8: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

object Spark_RDD {
  def main(args: Array[String]): Unit = {

    val config = new SparkConf().setMaster("local[*]").setAppName("Spark_RDD").set("spark.testing.memory","2147480000")
    //创建spark上下文对象
    val sc = new SparkContext(config)
    //创建rdd,从内存中创建makeRDD,底层就是parallelize
    val listRDD: RDD[Int] = sc.makeRDD(List(1,2,5,4))
    listRDD.collect().foreach(println)
    //从内存中创建parallelize
    val arrayRDD: RDD[Int] = sc.parallelize(Array(1,2,3,4))

    arrayRDD.collect().foreach(println)


    //从外部存储创建,默认项目路径,也可以改为hdfs路径hdfs://hadoop102:9000/xxx
    //读取文件时,传递的参数为最小分区数,但是不一定是这个分区,取决于hadoop分片规则
    val fileRDD: RDD[String] = sc.textFile("in",2)
    fileRDD.foreach(println)

    //将内存创建RDD数据保存到文件,8核,四个数据放八个分区
   // listRDD.saveAsTextFile("output")
    fileRDD.saveAsTextFile("output")
  }

}

map
map(func)案例

  1. 作用:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
  2. 需求:创建一个1-10数组的RDD,将所有元素*2形成新的RDD
object Map {
  def main(args: Array[String]): Unit = {
    val config = new SparkConf().setMaster("local[*]").setAppName("Map").set("spark.testing.memory","2147480000")
    //创建spark上下文对象
    val sc = new SparkContext(config)
    /*创建一个1-10数组的RDD,将所有元素*2形成新的RDD*/
    //map操作在Executor,其他的操作在driver,executor如果要用到driver中的数据需要支持序列化
    val listRdd: RDD[Int] = sc.makeRDD(1 to 10)
    val mapRDD: RDD[Int] = listRdd.map(x => x*2)

    /*拼接*/
    val data = Array(1, 2, 3, 4, 5)

    sc.makeRDD(data).map(_ + "test").collect().foreach(println)
    mapRDD.collect().foreach(println)

  }

}

mapPartitions(func) 案例

  1. 作用:类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区。
  2. 需求:创建一个RDD,使每个元素*2组成新的RDD
object MapPartitions {
  def main(args: Array[String]): Unit = {
    val config = new SparkConf().setMaster("local[*]").setAppName("MapPartitions").set("spark.testing.memory","2147480000")
    //创建spark上下文对象
    val sc = new SparkContext(config)
    /*创建一个1-10数组的RDD*/
    val listRdd: RDD[Int] = sc.makeRDD(1 to 10)
    //mappartition对一个rdd中所有的分区进行遍历
    //优于map算子,减少发到执行器的交互次数
    //可能内存溢出
    val partitions: RDD[Int] = listRdd.mapPartitions(data=>{data.map(data=>data*2)})
    partitions.collect.foreach(println)


  }

}

flatMap(func) 案例

  1. 作用:类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
  2. 需求:创建一个元素为1-5的RDD,运用flatMap创建一个新的RDD,新的RDD为原RDD的每个元素的2倍(2,4,6,8,10)
object FlatMap {
  def main(args: Array[String]): Unit = {
    val config = new SparkConf().setMaster("local[*]").setAppName("FlatMap").set("spark.testing.memory","2147480000")
    //创建spark上下文对象
    val sc = new SparkContext(config)
    //(Array(1,2),(Array(2,3)===>1,2,2,3
    val listRdd: RDD[Array[Int]] = sc.makeRDD(List(Array(1,2),(Array(2,3))))
    val mapInfo: RDD[Int] = listRdd.flatMap(datas=>datas)
    mapInfo.collect().foreach(println)
  }

}

map()和mapPartition()的区别

  1. map():每次处理一条数据。
  2. mapPartition():每次处理一个分区的数据,这个分区的数据处理完后,原RDD中分区的数据才能释放,可能导致OOM。
  3. 开发指导:当内存空间较大的时候建议使用mapPartition(),以提高处理效率

glom案例

  1. 作用:将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]
  2. 需求:创建一个4个分区的RDD,并将每个分区的数据放到一个数组
object Glom {
  def main(args: Array[String]): Unit = {
    val config = new SparkConf().setMaster("local[*]").setAppName("Glom").set("spark.testing.memory","2147480000")
    //创建spark上下文对象
    val sc = new SparkContext(config)
    //glom算子,分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。
    val makeRDD: RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6,7,8),3)
    val glomRDD: RDD[Array[Int]] = makeRDD.glom()
    glomRDD.collect().foreach(array=>{
      println(array)
      // println(array.mkString(","))
    })
  }

}

groupBy(func)案例

  1. 作用:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。
  2. 需求:创建一个RDD,按照元素模以2的值进行分组。
object GroupBy {
  def main(args: Array[String]): Unit = {
    val config = new SparkConf().setMaster("local[*]").setAppName("GroupBy").set("spark.testing.memory", "2147480000")
    //创建spark上下文对象
    val sc = new SparkContext(config)
    //按照制定规则进行分组,分组后的数据形成元组,kv
    val groupRDD: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    val groupByRDD: RDD[(Int, Iterable[Int])] = groupRDD.groupBy(_%2)
    groupByRDD.collect().foreach(println)

  }

}

(0,CompactBuffer(2, 4))
(1,CompactBuffer(1, 3))
  1. 作用:过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成。
  2. 需求:创建一个RDD(由字符串组成),过滤出一个新RDD(包含”xiao”子串)
object Filter {
  def main(args: Array[String]): Unit = {
    val config = new SparkConf().setMaster("local[*]").setAppName("Filter").set("spark.testing.memory", "2147480000")
    //创建spark上下文对象
    val sc = new SparkContext(config)
    //按照制定规则进行分组,分组后的数据形成元组,kv
    val fil: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
    val filterRDD: RDD[Int] = fil.filter(_%2==1)
    filterRDD.collect().foreach(println)


  }

}

sample(withReplacement, fraction, seed) 案例

  1. 作用:以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子。
object Sample {
  def main(args: Array[String]): Unit = {
    val config = new SparkConf().setMaster("local[*]").setAppName("Sample").set("spark.testing.memory", "2147480000")
    //创建spark上下文对象
    val sc = new SparkContext(config)
    val rdd: RDD[String] = sc.makeRDD(Array("hello1","hello1","hello2","hello3","hello4","hello5","hello6","hello1","hello1","hello2","hello3"))
    val sampleRDD: RDD[String] = rdd.sample(false,0.7)
    sampleRDD.foreach(println)
  }

}

coalesce(numPartitions) 案例

  1. 作用:缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。
object Coalesce {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Coalesce").set("spark.testing.memory", "2147480000")
    val sc = new SparkContext(conf)
    val makeRdd: RDD[Int] = sc.makeRDD(1 to 16,4)
    //缩减分区数
    val coalesceRdd: RDD[Int] = makeRdd.coalesce(3)
    coalesceRdd.saveAsTextFile("output")
    println(coalesceRdd.partitions.size)
  }

}

repartition(numPartitions) 案例

  1. 作用:根据分区数,重新通过网络随机洗牌所有数据。
  2. 需求:创建一个4个分区的RDD,对其重新分区
object Repartition {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Distinct").set("spark.testing.memory", "2147480000")
    val sc = new SparkContext(conf)
    val parallelize: RDD[Int] = sc.parallelize(1 to 16,4)
    println(parallelize.partitions.size)
    val rerdd = parallelize.repartition(2)
    println(rerdd.partitions.size)
    val glom: RDD[Array[Int]] = rerdd.glom()
    glom.collect().foreach(array=>{
      println(array.mkString(","))
    })

  }

}

coalesce和repartition的区别

  1. coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。
  2. repartition实际上是调用的coalesce,默认是进行shuffle的。源码如下:
    def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true)}

aggregateByKey案例
参数:(zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U)

  1. 作用:在kv对的RDD中,,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。
  2. 参数描述:
    (1)zeroValue:给每一个分区中的每一个key一个初始值;
    (2)seqOp:函数用于在每一个分区中用初始值逐步迭代value;
    (3)combOp:函数用于合并每个分区中的结果。
  3. 需求:创建一个pairRDD,取出每个分区相同key对应值的最大值,然后相加
  4. 需求分析


    image.png
object AggregateByKey {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Coalesce").set("spark.testing.memory", "2147480000")
    val sc = new SparkContext(conf)
    //aggregateByKeyRDD算子
    val rdd = sc.parallelize(List(("a",3),("c",4),("b",3),("c",6),("c",8),("a",2)),2)
    val aggregateByKeyRDD: RDD[(String, Int)] = rdd.aggregateByKey(0)(math.max(_,_),_+_)
    aggregateByKeyRDD.collect().foreach(println)
  }

}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 223,507评论 6 521
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 95,626评论 3 401
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 170,466评论 0 366
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 60,456评论 1 300
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 69,473评论 6 398
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 53,016评论 1 314
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 41,409评论 3 426
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 40,370评论 0 278
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,909评论 1 322
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,960评论 3 343
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 41,102评论 1 354
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,754评论 5 350
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,428评论 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,917评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 34,036评论 1 275
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 49,582评论 3 380
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 46,126评论 2 362

推荐阅读更多精彩内容