Spark中常用Transformations RDD

Spark RDD Transformations

value类型

map

返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

    val arr =  Array(1,2,3,4,5,6)
    val numRDD = sc.parallelize(arr)
    val resultRDD = numRDD.map(x => "(" + x*x + ")")
    resultRDD.foreach(print)
//输出结果
(1)(4)(9)(16)(25)(36)

mapPartition

类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] =>Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区。

val arrayRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9))
    arrayRDD.mapPartitions(elements => {
      val result = new ArrayBuffer[Int]
      elements.foreach(e => {
        result += e
      })
      result.iterator
    }).foreach(println)
  }
//输出结果
1
2
3
4
5
6
7
8
9

mapPartitionsWithIndex

//第二个参数指定分区数量
val arrayRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),3)
//方法中有一个Int类型参数,指分区号;一个迭代器类型,保存分区内所有元素
    arrayRDD.mapPartitionsWithIndex((index,elements) => {
      println("partition index:" + index)
      val result = new ArrayBuffer[Int]
      elements.foreach(e => {
        result += e
      })
      result.iterator
    }).foreach(println)

输出结果

partition index:0
1
2
3
partition index:1
4
5
6
partition index:2
7
8
9

flatMap

flatMap根据切分的规则将集合内各元素切割,压平成无数个新的元素保存在一个集合中。

val words = Array("hello python","hello hadoop","hello spark")
    val wordRDD = sc.parallelize(words)
    wordRDD.flatMap(_.split(" ")).collect.foreach(println)
//输出结果
hello
python
hello
hadoop
hello
spark

map和mapPartition()的区别

  • map每次处理一条数据
  • mapPartition每次处理一个分区的数据,这个分区的数据处理完后,元RDD中分区的数据才能释放,可能导致OOM(内存溢出)
  • 当内存空间较大的时候使用mapPartition,可以提高处理效率

filter

将集合按照指定的规则过滤,将符合过滤规则的元素返回到新的集合中。

val arr = Array(1,2,3,4,5,6)
    val numRDD = sc.parallelize(arr)
    val resultRDD = numRDD.filter(_%2==0)
    resultRDD.foreach(println)
//输出结果
2
4
6

glom

每个分区对应一个数组,数组保存该分区中的所有元素,形成新的RDD类型RDD[Array[T]]

val arrayRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),3)
arrayRDD.glom().collect.foreach(println)
//输出结果
[I@340b7ef6
[I@30404dba
[I@6050462a

可以看到三个数组对应的地址值。

groupBy 分组

分组,条件中的返回值作为key,将key值相同的元素放在同一个组里

返回值是k,v形式,其中V是一个集合的形式

val arrayRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),3)
arrayRDD.groupBy(x => x%2).collect.foreach(println)
//输出结果
(0,CompactBuffer(2, 4, 6, 8))
(1,CompactBuffer(1, 3, 5, 7, 9))

sample(withReplacement,fraction, seed) 抽样

以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子。

当选择不放回抽样时,fraction参数值必须在[0,1]之间。假如数据集中有100个元素,当fraction值为0.1时,抽样的个数大概会在10个

当选择放回抽样时,fraction参数值表示的意思是数据集中每个元素期望被抽出来的次数(放回抽样的底层原理是泊松分布和伯努利分布),具体可以参考一下源码。

seed参数可以不指定,如果要指定建议使用System.currentTimeMillis(当前时间戳)。

    val arrayRDD = sc.parallelize(1 to 10)
    arrayRDD.sample(false,0.2).foreach(println)
//输出结果
2
5
8

distinct去重

val arrayRDD = sc.parallelize(Array(1,1,1,2,2,2,3,3,3,4,4,4))
val resultRDD =  arrayRDD.distinct.collect
println(resultRDD.mkString(","))
//输出结果
2
5
8

coalesce缩减分区数

窄依赖,可以选择是否经过shuffle,默认不经过shuffle阶段,若指定经过shuffle阶段则功能和repartition类似

作用:经过大数据集的过滤后,减少分区数,提高小数据集效率

//初始化RDD,设置四个分区
val arrRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8),4)
val coaRDD = arrRDD.coalesce(2)
val resultRDD = coaRDD.mapPartitionsWithIndex((index,value) => value.map((index,_)))
    resultRDD.collect.foreach(print)
//输出结果
(0,1)(0,2)(0,3)(0,4)(1,5)(1,6)(1,7)(1,8)

将分区减少为两个,可以看出结果未经过shuffle阶段,将数据集前四个元素放到第一个分区,将后四个元素放到第二个分区。

试着将缩减分区数改为10(大于原分区数)

val arrRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8),4)
val coaRDD = arrRDD.coalesce(10)
val resultRDD = coaRDD.mapPartitionsWithIndex((index,value) => value.map((index,_)))
resultRDD.collect.foreach(print)
//输出结果
(0,1)(0,2)(1,3)(1,4)(2,5)(2,6)(3,7)(3,8)

可以看到分区数还是四个,说明coalesce在默认情况下无法扩展分区数

repartition 重分区

宽依赖,经过shuffle阶段,可以扩大或者缩减分区数

val arrRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8),4)
val repRDD = arrRDD.repartition(2)
val resultRDD = repRDD.mapPartitionsWithIndex((index,value) => value.map((index,_)))
resultRDD.collect.foreach(print)
//输出结果
(0,1)(0,4)(0,5)(0,7)(1,2)(1,3)(1,6)(1,8)

将分区数指定为两个,可以看到输出结果中元素没有按照原有顺序写入分区,说那个repartition重分区经过了shuffle阶段。

将分区数改为6,再看输出结果。

(0,4)(1,1)(2,2)(2,7)(3,8)(4,5)(5,3)(5,6)

分区数变成六个。

coalesce和repartition的区别

下面是repartition的源码,可以看出repartiton底层是调用coalesce完成的,并且指定开启shuffle。

  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
  }

sortBy(func,[ascending],[numTasks])

  • sortBy根据指定的func返回值进行排序
  • ascending是布尔型变量。默认为true,表示升序排序。false为降序排序
  • 最后一个 参数用来指定分区数量
val arrRDD = sc.parallelize(Array(2,1,6,3,7,5,8,4))arrRDD.sortBy(x => x).collect.foreach(print)
//输出结果
12345678

pipe

管道,针对每个分区都执行一遍shell脚本,返回输出的RDD

脚本需要放在Worker节点能找到的位置

双Value类型交互

union

可以将两个rdd中的元素合并

val rdd1 = sc.parallelize(Array(1,2,3,5,7))
val rdd2 = sc.parallelize(Array(2,4,5,6,8))
val resultRDD = rdd1.union(rdd2).collect.foreach(print)
//输出结果
1235724568

subtract

求两个rdd的差集。

差集:rdd1 - (rdd1 ∩ rdd2)

val rdd1 = sc.parallelize(Array(1,2,3,5,7))
val rdd2 = sc.parallelize(Array(2,4,5,6,8))
val resultRDD = rdd1.subtract(rdd2).collect.foreach(println)
//输出结果
1
3
7

intersection

求交集

val rdd1 = sc.parallelize(Array(1,2,3,5,7))
val rdd2 = sc.parallelize(Array(2,4,5,6,8))
val resultRDD = rdd1.intersection(rdd2).collect.foreach(println)
//输出结果
5
2

cartesian

笛卡尔积

val rdd1 = sc.parallelize(Array(1,2,3,5,7))
val rdd2 = sc.parallelize(Array(2,4,5,6,8))
val resultRDD = rdd1.cartesian(rdd2).collect.foreach(print)
//输出结果
(1,2)(1,4)(1,5)(1,6)(1,8)(2,2)(2,4)(2,5)(2,6)(2,8)(3,2)(3,4)(3,5)(3,6)(3,8)(5,2)(5,4)(5,5)(5,6)(5,8)(7,2)(7,4)(7,5)(7,6)(7,8)

zip

将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。

val rdd1 = sc.parallelize(Array(1,2,3,5,7))
val rdd2 = sc.parallelize(Array(2,4,5,6,8))
val resultRDD = rdd1.zip(rdd2).collect.foreach(print)
//输出结果
(1,2)(2,4)(3,5)(5,6)(7,8)

KeyValue类型

partitonBy

与repartition的区别:partitonBy可以修改分区器与分区个数,例如:哈希分区。

val rdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd")),4)
println(rdd.partitionBy(new HashPartitioner(2)).partitions.size)

输出结果:

2

reduceByKey

将相同的key值聚合到一起,value自定义操作

val rdd = sc.parallelize(Array(("amy",1),("Bob",1),("Cindy",1),("amy",1),("amy",1),("Bob",1)))
val reduceRDD = rdd.reduceByKey((x,y) => x+y )
reduceRDD.collect.foreach(println)

结果:

(amy,3)
(Cindy,1)
(Bob,2)

groupByKey(K,Iterable)

按照key值进行分组,将value值放到迭代器中。

val rdd = sc.parallelize(Array(("amy",1),("Bob",1),("Cindy",1),("amy",1),("amy",1),("Bob",1)))
val reduceRDD = rdd.groupByKey().collect.foreach(println)

结果:

(amy,CompactBuffer(1, 1, 1))
(Cindy,CompactBuffer(1))
(Bob,CompactBuffer(1, 1))

reduceByKey和groupByKey的区别

  1. reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v].

  2. groupByKey:按照key进行分组,直接进行shuffle。

  3. 开发指导:reduceByKey比groupByKey,建议使用。但是需要注意是否会影响业务逻辑。

aggregateBybyKey

作用:在kv对的RDD中,,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。

参数描述:

  • zeroValue:给每一个分区中的每一个key一个初始值;

  • seqOp:函数用于在每一个分区中用初始值逐步迭代value;

  • combOp:函数用于合并每个分区中的结果。

源码:

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
    aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
  }

从源码中可以看出,在各个分区里面zeroValue的U值传入啊seqOp与原始数据集的V进行运算,将运算结果作为U再次传入seqOp。

而各个分区互相之间在利用combOp函数进行聚合操作,最终可得到结果。

    val data = List((1, 3), (1, 4), (2, 3), (3, 6), (1, 2), (3, 8))
    val rdd = sc.parallelize(data, 3)
    rdd.aggregateByKey(0)(math.max(_,_), _ + _).collect.foreach(println)

结果:

(3,14)
(1,6)
(2,3)

foldByKey

处于reduceByKey与aggregateByKey之间,可以指定初始值,但是无法将分区内和分区之间的两个操作自定义执行。

val data = List(("amy",1),("amy",1), ("Bob",1),("Bob",1), ("Bob",1), ("Cindy",1))
val rdd = sc.parallelize(data, 2)
rdd.foldByKey(0)(_+_).collect.foreach(println)

结果:

(amy,2)
(Cindy,1)
(Bob,3)

combineByKey

对于相同的key,将value合并并聚合

//源码
def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
  }

从源码中可以看出函数执行分为三个阶段,详见下方案例。

案例:求相同key对应的所有value的平均值

val result = rdd.combineByKey(
      //1.createCombiner,将原数据集中读取的value值
      //根据需求转换为指定的格式,这里转换为一个元组
      (_, 1),
      //2.mergeValue,将读取进来的相同key下的value做累加
      //求平均值,所以第一个值用来求和,
      //第二个值用来记录总数,
      (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
      //3.mergeCombiners,将不同分区之间的数据累加
      (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
    ).collect

    /**
      * rdd的输出结果,第一个值为和,第二个值为总数
      * (b,(74,3))
      * (a,(35,2))
      * (c,(26,1))
      */

    result.map(x => (x._1,x._2._1 / x._2._2)).foreach(println)

结果:

(b,24)
(a,17)
(c,26)

sortByKey

根据key值进行排序,其中key值必须为可排序的类型。排序结束返回一个RDD。

val data = List(("b", 26),("a", 10), ("c", 26), ("a", 25), ("b", 15) , ("b", 33))
    val rdd = sc.parallelize(data, 2)
    val result = rdd.sortByKey().collect.foreach(println)

结果:

(a,10)
(a,25)
(b,26)
(b,15)
(b,33)
(c,26)

mapValues

与map的功能类似,不过mapValues是针对键值对的数据类型且对其中的value进行操作。

val data = List(("b", 1),("a", 1), ("c", 1), ("a", 1), ("b", 1) , ("b", 1))
    val rdd = sc.parallelize(data, 2)
    val result = rdd.mapValues(x => x+1).collect.foreach(println)

结果:

(b,2)
(a,2)
(c,2)
(a,2)
(b,2)
(b,2)

join

当两个RDD的key值相同时,使用join将两个RDD的value聚合,返回结果为(key,(value1,value2))

val data1 = List(("a",1),("b",1),("c",1))
    val data2 = List(("a",2),("b",2),("c",2))

    val rdd1 = sc.parallelize(data1)
    val rdd2 = sc.parallelize(data2)

    rdd1.join(rdd2).collect.foreach(x => print(x + ";"))

结果:

(a,(1,2));(b,(1,2));(c,(1,2));

cogroup

与join类似,输出格式为(K,(Iterable<V>,Iterable<W>)),指将两个rdd相同key下的value放到相对应的迭代器中。

    val data1 = List(("a",1),("b",1),("c",1))
    val data2 = List(("a",2),("a",3),("b",2),("c",2))

    val rdd1 = sc.parallelize(data1)
    val rdd2 = sc.parallelize(data2)

    rdd1.cogroup(rdd2).collect.foreach(println)

结果:

(a,(CompactBuffer(1),CompactBuffer(2, 3)))
(b,(CompactBuffer(1),CompactBuffer(2)))
(c,(CompactBuffer(1),CompactBuffer(2)))

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,701评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,649评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,037评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,994评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,018评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,796评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,481评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,370评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,868评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,014评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,153评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,832评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,494评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,039评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,156评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,437评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,131评论 2 356

推荐阅读更多精彩内容