spark scala RDD基本操作

弹性分布式数据集(Resilient Distributed Dataset-RDD)
我使用spark比较晚,所以我使用dataframe比较多,听说rdd这块以后spark也停止更新了,但是目前dataframe还是不如rdd灵活,而且spark SQL一些方法不大稳定,有一些rdd的技巧还是要继续使用。
下面是整理的常用操作,一部分来自书里,一部分是自己整理的。

1.创建RDD

两种创建RDD的方式:读取外部数据集,以及在驱动器程序中对一个集合进行并行化。

1.Scala 中的parallelize()方法,直接把程序中一个已有的集合传给这个方法就可以生成一个RDD。
val lines = sc.parallelize(List("pandas","i like pandas"))
val lines = sc.parallelize(Array(1,2,3,4,5,6))
用case class来规范数据类型

case class UserData(userID:String,userName:String,userAddress:String)
val lines = sc.parallelize(
                                      List("1234","pandas","chengdu"),
                                      List("4321","tiger","jiling"),
                                      List("1111","bird","china")
)

2.更常用的方式是从外部存储中读取数据来创建RDD。
val lines = sc.textFile("D://wenben.txt")
3.使用makeRDD来构造(不建议使用)
val rdd01 = sc.makeRDD(List((1,2,null,null),(1,2,3,4),(4,3,null,null)))

2.RDD操作

RDD支持两种操作:转化操作和行动操作
转化操作返回的是RDD,而行动操作返回的是其他的数据类型。

2.1.转化操作(lazy模式):

最常用的转化操作是map()和filter(),inputRDD{1,2,3,4}
inputRDD.map(x => x * x)
Mapped RDD{1,4,9,16}
把列类型转化为字符串,map中通常使用v(0),索引从0开始
inputRDD.map(v => (v(0).toString,v(1).toString))

inputRDD.filter(x => x! = 1)
Filtered RDD{2,3,4}
filter筛选条件为,第一列字符串长度大于10,第二列字符串长度大于5。这里索引从1开始。
inputRDD.filter(v => v._1.length > 10 && v._2.length > 5)

flatMap和map操作

flatMap()相当于把map的作用的数组拆散再合并。

import org.apache.spark.{SparkConf, SparkContext}

object MapAndFlatMap {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("map_flatMap_demo").setMaster("local"))
    val arrayRDD =sc.parallelize(Array("a_b","c_d","e_f"))
    arrayRDD.foreach(println)
/*
结果 :
a_b
c_d
e_f
*/

    arrayRDD.map(string=>{
      string.split("_")
    }).foreach(x=>{
      println(x.mkString(",")) 
    })
/*
结果:
a,b
c,d
e,f
所以map得到的RDD结果是 Array(Array("a","b"),Array("c","d"),Array("e","f"))
*/
    arrayRDD.flatMap(string=>{
      string.split("_")
    }).foreach(x=>{
      println(x.mkString(","))//打印结果3
    })
/*
结果:
a
b
c
d
e
f
所以flatMap得到的RDD结果是Array("a","b","c","d","e","f")
*/
  }
}

sample操作

sample操作有两个参数,第一个参数是代表采样是有放回还是无放回,第二个参数代表抽样的比例。
1、withReplacement:元素可以多次抽样(在抽样时替换)
2、fraction:期望样本的大小作为RDD大小的一部分,
当withReplacement=false时:选择每个元素的概率;分数一定是[0,1] ;
当withReplacement=true时:选择每个元素的期望次数; 分数必须大于等于0。
3、seed:随机数生成器的种子
例子:
1.元素不可以多次抽样:withReplacement=false,每个元素被抽取到的概率为0.5:fraction=0.5

JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(6,2,8,4));
JavaRDD<Integer> res = rdd.sample(false, 0.5);
res.foreach(x -> System.out.print(x +" "));

//结果:2 8 4 

2.元素可以多次抽样:withReplacement=true,每个元素被抽取到的期望次数为2:fraction=2

JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(6,2,8,4));
JavaRDD<Integer> res = rdd.sample(true, 2);
res.foreach(x -> System.out.print(x +" "));

//结果:6 2 8 8 8 8 8 4

map和mapPartitions

(1)、使用map(func())遍历
map相当于遍历,遍历1000行的表,就要调用func一千次。

(2)、使用mapPartitions(func())遍历
mapPartition中func()仅仅被调用分区数量的次数,例如10个分区,仅仅调用10次。假如函数内部存在分词词库导入关闭,数据库链接等等,使用map每调用func()一次都要跑一遍这些操作,严重影响性能。
这时候就需要把map改写成mapPartitions
改写方式也很简单:
例如可以用一下map,几乎不做任何操作:

    var data_rdd = df.rdd.map{x => {
      val (imei,address,categories,tags,parent,place_type,place_name,visit_time,nearby_type,stat_date) = (x(0),x(1),x(2),x(3),x(4),x(5),x(6),x(7),x(8),x(9))
      (imei,address,categories,tags,parent,place_type,place_name,visit_time,nearby_type)
      }}

改写成mapPatitions需要使用一下迭代器:

   val data_rdd = df.rdd.repartition(200).mapPartitions(iter => for (x <- iter) yield {
      val (imei,address,categories,tags,parent,place_type,place_name,visit_time,nearby_type,stat_date) = (x(0),x(1),x(2),x(3),x(4),x(5),x(6),x(7),x(8),x(9))
      (imei,address,categories,tags,parent,place_type,place_name,visit_time,nearby_type)
    })

zip和zipPartitions

zip

def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]

zip函数用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at :21
 
scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at :21
 
scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at makeRDD at :21
 
scala> rdd1.zip(rdd2).collect
res0: Array[(Int, String)] = Array((1,A), (2,B), (3,C), (4,D), (5,E))           
 
scala> rdd2.zip(rdd1).collect
res1: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (D,4), (E,5))
 
scala> var rdd3 = sc.makeRDD(Seq("A","B","C","D","E"),3)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at makeRDD at :21
 
scala> rdd1.zip(rdd3).collect
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions
//如果两个RDD分区数不同,则抛出异常

zipPartitions

zipPartitions函数将多个RDD按照partition组合成为新的RDD,该函数需要组合的RDD具有相同的分区数,但对于每个分区内的元素数量没有要求。

该函数有好几种实现,可分为三类:

参数是一个RDD
def zipPartitions[B, V](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]

def zipPartitions[B, V](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]

这两个区别就是参数preservesPartitioning,是否保留父RDD的partitioner分区信息

映射方法f参数为两个RDD的迭代器。

scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at makeRDD at :21
 
scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[23] at makeRDD at :21
 
//rdd1两个分区中元素分布:
scala> rdd1.mapPartitionsWithIndex{
     |         (x,iter) => {
     |           var result = List[String]()
     |             while(iter.hasNext){
     |               result ::= ("part_" + x + "|" + iter.next())
     |             }
     |             result.iterator
     |            
     |         }
     |       }.collect
res17: Array[String] = Array(part_0|2, part_0|1, part_1|5, part_1|4, part_1|3)
 
//rdd2两个分区中元素分布
scala> rdd2.mapPartitionsWithIndex{
     |         (x,iter) => {
     |           var result = List[String]()
     |             while(iter.hasNext){
     |               result ::= ("part_" + x + "|" + iter.next())
     |             }
     |             result.iterator
     |            
     |         }
     |       }.collect
res18: Array[String] = Array(part_0|B, part_0|A, part_1|E, part_1|D, part_1|C)
 
//rdd1和rdd2做zipPartition
scala> rdd1.zipPartitions(rdd2){
     |       (rdd1Iter,rdd2Iter) => {
     |         var result = List[String]()
     |         while(rdd1Iter.hasNext && rdd2Iter.hasNext) {
     |           result::=(rdd1Iter.next() + "_" + rdd2Iter.next())
     |         }
     |         result.iterator
     |       }
     |     }.collect
res19: Array[String] = Array(2_B, 1_A, 5_E, 4_D, 3_C)

参数是两个RDD
def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]

def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]

用法同上面,只不过该函数参数为两个RDD,映射方法f输入参数为两个RDD的迭代器。

scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at makeRDD at :21
 
scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at makeRDD at :21
 
scala> var rdd3 = sc.makeRDD(Seq("a","b","c","d","e"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[29] at makeRDD at :21
 
//rdd3中个分区元素分布
scala> rdd3.mapPartitionsWithIndex{
     |         (x,iter) => {
     |           var result = List[String]()
     |             while(iter.hasNext){
     |               result ::= ("part_" + x + "|" + iter.next())
     |             }
     |             result.iterator
     |            
     |         }
     |       }.collect
res21: Array[String] = Array(part_0|b, part_0|a, part_1|e, part_1|d, part_1|c)
 
//三个RDD做zipPartitions
scala> var rdd4 = rdd1.zipPartitions(rdd2,rdd3){
     |       (rdd1Iter,rdd2Iter,rdd3Iter) => {
     |         var result = List[String]()
     |         while(rdd1Iter.hasNext && rdd2Iter.hasNext && rdd3Iter.hasNext) {
     |           result::=(rdd1Iter.next() + "_" + rdd2Iter.next() + "_" + rdd3Iter.next())
     |         }
     |         result.iterator
     |       }
     |     }
rdd4: org.apache.spark.rdd.RDD[String] = ZippedPartitionsRDD3[33] at zipPartitions at :27
 
scala> rdd4.collect
res23: Array[String] = Array(2_B_b, 1_A_a, 5_E_e, 4_D_d, 3_C_c)

参数是三个RDD
def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]

def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]

用法同上面,只不过这里又多了个一个RDD而已。

zipWithIndex、zipWithUniqueId

zipWithIndex

def zipWithIndex(): RDD[(T, Long)]
该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。

scala> var rdd2 = sc.makeRDD(Seq("A","B","R","D","F"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[34] at makeRDD at :21
 
scala> rdd2.zipWithIndex().collect
res27: Array[(String, Long)] = Array((A,0), (B,1), (R,2), (D,3), (F,4))

zipWithUniqueId

def zipWithUniqueId(): RDD[(T, Long)]

该函数将RDD中元素和一个唯一ID组合成键/值对,该唯一ID生成算法如下:
每个分区中第一个元素的唯一ID值为:该分区索引号,
每个分区中第N个元素的唯一ID值为:(前一个元素的唯一ID值) + (该RDD总的分区数)

看下面的例子:

scala> var rdd1 = sc.makeRDD(Seq("A","B","C","D","E","F"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[44] at makeRDD at :21
//rdd1有两个分区,
scala> rdd1.zipWithUniqueId().collect
res32: Array[(String, Long)] = Array((A,0), (B,2), (C,4), (D,1), (E,3), (F,5))
//总分区数为2
//第一个分区第一个元素ID为0,第二个分区第一个元素ID为1
//第一个分区第二个元素ID为0+2=2,第一个分区第三个元素ID为2+2=4
//第二个分区第二个元素ID为1+2=3,第二个分区第三个元素ID为3+2=5

其他常用转化操作

1.对一个数据为{1,2,3,3}的RDD进行基本的RDD转化操作

函数名 目的 示例 结果
map() 函数应用于rdd中每个元素 rdd.map(x => x+1) {2,3,4,4}
flatMap() 将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD。通常用来切分单词,执行扁平化操作。 rdd.flatMap(x => x.to(3)) {1,2,3,2,3,3,3}
filter() 返回一个由通过传给filter()的函数的元素组成的RDD rdd.filter(x => x!=1) {2,3,3}
distinct() 去重 rdd.distinct() {1,2,3}
sample(withReplacement,fraction,[seed]) 对RDD采样,以及是否替换,抽取比例等等 rdd.sample(false,0.5) {1,2,3} 非确定的

2.对数据分别为{1,2,3}和{3,4,5}的RDD进行针对两个RDD的转化操作

函数名 目的 示例 结果
union() 生成一个包含两个RDD中所有元素的RDD (不去重的并集) rdd.union(other) {1,2,3,3,4,5}
intersection() 求两个RDD共同的元素的RDD (交集) rdd.intersection(other) {3}
subtract() 求移除一个RDD中的内容 (差集) rdd.subtract(other) {1,2}
cartesian() 笛卡尔积 rdd.cartesian(other) {(1,3),(1,4),...,(3,5)}

2.2.行动操作:

函数名 目的 示例 结果
collect() 返回RDD中的所有元素 rdd.collect() {1,2,3,3}
count() RDD中元素的个数 rdd.count() 4
countByValue() 各元素在RDD中出现的次数 rdd.countByValue() {(1,1),(2,1),(3,2)}
take(num) 从RDD中返回num个元素 rdd.take(2) {1,2}
top(num) 从RDD中返回最前面的num个元素 rdd.top(2) {3,3}
takeOrdered(num) 从RDD中按照提供的顺序返回最前面的num个元素 rdd.takeOrdered(2)(myOrdering) {3,3}
takeSampe(withReplacement,num,[seed]) 从RDD中返回任意一些元素 rdd.takeSample(false,1) 非确定的
reduce(func) 并行整合RDD中所有数据 rdd.reduce((x,y) => x+y) 9
fold(zero)(func) 和reduce类似,但是需要提供初始值 rdd.fold(0)((x,y) => x+y) 9
aggregate(zeroValue)(seqOp,comOp) 和reduce相似,但是通常返回不同类型的函数 rdd.aggregate((0,0)) ((x,y)=> (x._1+x,x._2+1) ,(x,y) => (x._1+y._1,x._2+y._2)) (9,4)
foreach(func) 对RDD中的每个元素使用给定的函数 rdd.foreach(func) 无 //和map进行对比,map也是对RDD中的每个元素进行操作,但是允许有返回值

first操作

返回RDD中第一个元素

collect操作

以集合形式返回RDD的元素(常用语小的数据集)
比如某个文件数据

中国
美国
加拿大
……

读取的时候希望把文件中的元素放到列表里

import scala.collection.mutable.ArrayBuffer
val stat_array = new ArrayBuffer[String]()
rdd.collect.foreach(v => (stat_array += v))

take操作

take(num:Int):将RDD作为集合,返回集合中[0,num-1]下标的元素

reduce操作

reduce(f:(T,T)=>T):对RDD中元素进行二元计算,返回计算结果

top操作

top(num:Int):按照默认的或者是指定的排序规则,返回前num个元素

takeOrdered操作

takeOrdered(num:Int):和top相反,返回前num个元素

foreach和map的区别:

foreach是action操作,它没有返回值,通常用于print或者写入文件等操作。
map是transform操作(lazy),生成一个新的rdd

sortBy函数(在org.apache.spark.rdd.RDD类中)

共三个参数:
  第一个参数是一个函数,该函数的也有一个带T泛型的参数,返回类型和RDD中元素的类型是一致的;
  第二个参数是ascending,该参数决定排序后RDD中的元素是升序还是降序,默认是true,也就是升序;
  第三个参数是numPartitions,该参数决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的个数相等

例如:
data: (item_num, item_id, item_name)
val data_sorted = data.sortBy(_._1, ascending = false) 根据item_num从大到小排列(降序)。

2.3.键值对操作

2.3.1.partitionBy,mapValues,flatMapValues

partitionBy,mapValues,flatMapValues和基本转换操作中的repatition,map和flatMap功能类似。
partitionBy接口根据partitioner函数生成新的ShuffledRDD,将原RDD重新分区(在repartition中也是先将RDD[T]转化成RDD[K,V],这里V是null,然后使用RDD[K,V]作为函数生成ShuffledRDD)。mapValues和flatMapValues针对[K,V]中的V值进行map操作和flatMap操作。
使用partitionBy,mapValues,flatMapValues不会破坏原数据的partition的结构&信息,使用repatition,map和flatMap后还需要做Shuffle,数据就不带有原先的partition信息。所以键值对操作尽量使用partitionBy,mapValues,flatMapValues,少用repatition,map和flatMap。



2.3.2.combineByKey,foldByKey,reduceBykey,groupByKey

四种键值对转换操作都是针对RDD[K,V]本身,不涉及与其他RDD的组合操作,四种操作类型最终都会归结为堆combinByKey的调用。combineByKey接口是将RDD[K,V]转化成返回类型RDD[K,C],这里V类型与C类型可以相同也可以不同。
groupbykey效率很低,尽量使用reducebykey和combinebykey来代替groupbykey

reduceByKey函数

reduceByKey(_+_) 相当于 reduceByKey((a,b)=>(a+b))
reduceByKey(_++_) 相当于 reduceByKey((a,b)=>(a++b))这里a和b 是一个list,++用于合并列表

combineByKey函数

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]

其中的参数:

createCombiner:组合器函数,用于将V类型转换成C类型,输入参数为RDD[K,V]中的V,输出为C
mergeValue:合并值函数,将一个C类型和一个V类型值合并成一个C类型,输入参数为(C,V),输出为C
mergeCombiners:合并组合器函数,用于将两个C类型值合并成一个C类型,输入参数为(C,C),输出为C
numPartitions:结果RDD分区数,默认保持原有的分区数
partitioner:分区函数,默认为HashPartitioner
mapSideCombine:是否需要在Map端进行combine操作,类似于MapReduce中的combine,默认为true

举例理解:

假设我们要将一堆的各类水果给榨果汁,并且要求果汁只能是纯的,不能有其他品种的水果。那么我们需要一下几步:

1 定义我们需要什么样的果汁。
2 定义一个榨果汁机,即给定水果,就能给出我们定义的果汁。--相当于hadoop中的local combiner
3 定义一个果汁混合器,即能将相同类型的水果果汁给混合起来。--相当于全局进行combiner

那么对比上述三步,combineByKey的三个函数也就是这三个功能
1 createCombiner就是定义了v如何转换为c
2 mergeValue 就是定义了如何给定一个V将其与原来的C合并成新的C
3 就是定义了如何将相同key下的C给合并成一个C

var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))

rdd1.combineByKey(
(v : Int) => List(v),             --将1 转换成 list(1)
(c : List[Int], v : Int) => v :: c,       --将list(1)和2进行组合从而转换成list(1,2)
(c1 : List[Int], c2 : List[Int]) => c1 ::: c2  --将全局相同的key的value进行组合
).collect
res65: Array[(String, List[Int])] = Array((A,List(2, 1)), (B,List(2, 1)), (C,List(1)))

简要介绍
def combineByKey[C](createCombiner: (V) => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RD
createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就
和之前的某个元素的键相同。如果这是一个新的元素, combineByKey() 会使用一个叫作 createCombiner() 的函数来创建
那个键对应的累加器的初始值
mergeValue: 如果这是一个在处理当前分区之前已经遇到的键, 它会使用 mergeValue() 方法将该键的累加器对应的当前值与这个新的值进行合并
mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更
多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各
个分区的结果进行合并。

创建一个学生成绩说明的类
case class ScoreDetail(studentName: String, subject: String, score: Float)
下面是一些测试数据,加载测试数据集合 key = Students name and value = ScoreDetail instance

    val scores = List(
      ScoreDetail("xiaoming", "Math", 98),
      ScoreDetail("xiaoming", "English", 88),
      ScoreDetail("wangwu", "Math", 75),
      ScoreDetail("wangwu", "English", 78),
      ScoreDetail("lihua", "Math", 90),
      ScoreDetail("lihua", "English", 80),
      ScoreDetail("zhangsan", "Math", 91),
      ScoreDetail("zhangsan", "English", 80))

换成二元组, 也可以理解成转换成一个map, 利用了for 和 yield的组合

val scoresWithKey = for { i <- scores } yield (i.studentName, i)
val scoresWithKeyRDD = sc.parallelize(scoresWithKey).partitionBy(new org.apache.spark.HashPartitioner(3)).cache

聚合求平均值让后打印

      val avgScoresRDD = scoresWithKeyRDD.combineByKey(
      (x: ScoreDetail) => (x.score, 1)                     /*createCombiner*/,
      (acc: (Float, Int), x: ScoreDetail) => (acc._1 + x.score, acc._2 + 1) /*mergeValue*/,
      (acc1: (Float, Int), acc2: (Float, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) /*mergeCombiners*/
      // calculate the average
    ).map( { case(key, value) => (key, value._1/value._2) })
 
    avgScoresRDD.collect.foreach(println)
/*输出:
(zhangsan,85.5)
(lihua,85.0)
(xiaoming,93.0)
(wangwu,76.5)
*/

解释一下scoresWithKeyRDD.combineByKey
createCombiner: (x: ScoreDetail) => (x.score, 1)
这是第一次遇到zhangsan,创建一个函数,把map中的value转成另外一个类型 ,这里是把(zhangsan,(ScoreDetail类))转换成(zhangsan,(91,1))
mergeValue: (acc: (Float, Int), x: ScoreDetail) => (acc._1 + x.score, acc._2 + 1) 再次碰到张三, 就把这两个合并, 这里是将(zhangsan,(91,1)) 这种类型 和 (zhangsan,(ScoreDetail类))这种类型合并,合并成了(zhangsan,(171,2))
mergeCombiners (acc1: (Float, Int), acc2: (Float, Int)) 这个是将多个分区中的zhangsan的数据进行合并, 我们这里zhansan在同一个分区,这个地方就没有用上


contRdd.combineByKey(
(score:(String,Long)) => Map(score._1 -> score._2),
(c:Map[String,Long],score) => (c ++ Map(score._1 -> score._2)),
(c1:Map[String,Long],c2:Map[String,Long]) => (c1 ++ c2) )

topk问题:
https://www.twblogs.net/a/5c602891bd9eee06ef371458/zh-cn

2.4 控制操作

2.4.1. cache,persist
cache底层是调用了persist。
2.4.2. checkpoint

3 例子:

3.1 join或者其他消耗较大的处理前先进行聚合操作

join通常是你在使用Spark时最昂贵的操作,需要在join之前应尽可能的先缩小你的数据。

假设,你有一个RDD存着(熊猫id,分数),另外一个RDD存着(熊猫id,邮箱地址)。若你想给每只可爱的熊猫的邮箱发送她所得的最高的分数,你可以将RDD根据id进行join,然后计算最高的分数,如下:

def joinScoresWithAddress1( scoreRDD : RDD[(Long, Double)],
addressRDD : RDD[(Long, String )]) : RDD[(Long, (Double, String))]= {
    val joinedRDD = scoreRDD.join(addressRDD)
    joinedRDD.reduceByKey( (x, y) => if(x._1 > y._1) x else y )
}

然而,这可能不会比先减少分数数据的方案快。先计算最高的分数,那么每个熊猫的分数数据就只有一行,接下来再join地址数据:

def joinScoresWithAddress2( scoreRDD : RDD[(Long, Double)],
addressRDD : RDD[(Long, String )]) : RDD[(Long, (Double, String))]= {
    val bestScoreData = scoreRDD.reduceByKey((x, y) => if(x > y) x else y)
    bestScoreData.join(addressRDD)
}

若每个熊猫有1000个不同的分数,那么这种做法的shuffle量就比上一种的小了1000倍。

Reference:

https://blog.csdn.net/weixin_42181200/article/details/81201864
http://lxw1234.com/archives/2015/07/350.htm
【Spark快速大数据分析】
【Spark SQL入门与实践指南】

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,658评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,482评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,213评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,395评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,487评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,523评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,525评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,300评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,753评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,048评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,223评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,905评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,541评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,168评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,417评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,094评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,088评论 2 352

推荐阅读更多精彩内容

  • http://spark.apache.org/docs/latest/api/python/index.html...
    mpro阅读 6,092评论 0 4
  • Spark学习笔记 Data Source->Kafka->Spark Streaming->Parquet->S...
    哎哟喂喽阅读 6,612评论 0 51
  • Pair RDD基本操作 虽然大部分Spark的RDD操作都支持所有种类的对象,但是有少部分特殊的操作只能作用于键...
    LuciferTM阅读 5,824评论 1 6
  • 位于中国最南边的省份——海南岛,有两大主要城市,省会海口(位于海南省的北边),以及三亚(位于海南省最南边)。 去年...
    砚磊阅读 635评论 0 0
  • 最美不过四月天,恰逢天清气朗,邀上几位好友孩子们驱车前来感受龙井水库风釆。湖水一波万顷,却也不曾见识过如此静谧秀丽...
    小房子梅子姐姐阅读 1,518评论 0 2