弹性分布式数据集(Resilient Distributed Dataset-RDD)
我使用spark比较晚,所以我使用dataframe比较多,听说rdd这块以后spark也停止更新了,但是目前dataframe还是不如rdd灵活,而且spark SQL一些方法不大稳定,有一些rdd的技巧还是要继续使用。
下面是整理的常用操作,一部分来自书里,一部分是自己整理的。
1.创建RDD
两种创建RDD的方式:读取外部数据集,以及在驱动器程序中对一个集合进行并行化。
1.Scala 中的parallelize()方法,直接把程序中一个已有的集合传给这个方法就可以生成一个RDD。
val lines = sc.parallelize(List("pandas","i like pandas"))
val lines = sc.parallelize(Array(1,2,3,4,5,6))
用case class来规范数据类型
case class UserData(userID:String,userName:String,userAddress:String)
val lines = sc.parallelize(
List("1234","pandas","chengdu"),
List("4321","tiger","jiling"),
List("1111","bird","china")
)
2.更常用的方式是从外部存储中读取数据来创建RDD。
val lines = sc.textFile("D://wenben.txt")
3.使用makeRDD来构造(不建议使用)
val rdd01 = sc.makeRDD(List((1,2,null,null),(1,2,3,4),(4,3,null,null)))
2.RDD操作
RDD支持两种操作:转化操作和行动操作
转化操作返回的是RDD,而行动操作返回的是其他的数据类型。
2.1.转化操作(lazy模式):
最常用的转化操作是map()和filter(),inputRDD{1,2,3,4}
inputRDD.map(x => x * x)
Mapped RDD{1,4,9,16}
把列类型转化为字符串,map中通常使用v(0),索引从0开始
inputRDD.map(v => (v(0).toString,v(1).toString))
inputRDD.filter(x => x! = 1)
Filtered RDD{2,3,4}
filter筛选条件为,第一列字符串长度大于10,第二列字符串长度大于5。这里索引从1开始。
inputRDD.filter(v => v._1.length > 10 && v._2.length > 5)
flatMap和map操作
flatMap()相当于把map的作用的数组拆散再合并。
import org.apache.spark.{SparkConf, SparkContext}
object MapAndFlatMap {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("map_flatMap_demo").setMaster("local"))
val arrayRDD =sc.parallelize(Array("a_b","c_d","e_f"))
arrayRDD.foreach(println)
/*
结果 :
a_b
c_d
e_f
*/
arrayRDD.map(string=>{
string.split("_")
}).foreach(x=>{
println(x.mkString(","))
})
/*
结果:
a,b
c,d
e,f
所以map得到的RDD结果是 Array(Array("a","b"),Array("c","d"),Array("e","f"))
*/
arrayRDD.flatMap(string=>{
string.split("_")
}).foreach(x=>{
println(x.mkString(","))//打印结果3
})
/*
结果:
a
b
c
d
e
f
所以flatMap得到的RDD结果是Array("a","b","c","d","e","f")
*/
}
}
sample操作
sample操作有两个参数,第一个参数是代表采样是有放回还是无放回,第二个参数代表抽样的比例。
1、withReplacement:元素可以多次抽样(在抽样时替换)
2、fraction:期望样本的大小作为RDD大小的一部分,
当withReplacement=false时:选择每个元素的概率;分数一定是[0,1] ;
当withReplacement=true时:选择每个元素的期望次数; 分数必须大于等于0。
3、seed:随机数生成器的种子
例子:
1.元素不可以多次抽样:withReplacement=false,每个元素被抽取到的概率为0.5:fraction=0.5
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(6,2,8,4));
JavaRDD<Integer> res = rdd.sample(false, 0.5);
res.foreach(x -> System.out.print(x +" "));
//结果:2 8 4
2.元素可以多次抽样:withReplacement=true,每个元素被抽取到的期望次数为2:fraction=2
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(6,2,8,4));
JavaRDD<Integer> res = rdd.sample(true, 2);
res.foreach(x -> System.out.print(x +" "));
//结果:6 2 8 8 8 8 8 4
map和mapPartitions
(1)、使用map(func())遍历
map相当于遍历,遍历1000行的表,就要调用func一千次。
(2)、使用mapPartitions(func())遍历
mapPartition中func()仅仅被调用分区数量的次数,例如10个分区,仅仅调用10次。假如函数内部存在分词词库导入关闭,数据库链接等等,使用map每调用func()一次都要跑一遍这些操作,严重影响性能。
这时候就需要把map改写成mapPartitions
改写方式也很简单:
例如可以用一下map,几乎不做任何操作:
var data_rdd = df.rdd.map{x => {
val (imei,address,categories,tags,parent,place_type,place_name,visit_time,nearby_type,stat_date) = (x(0),x(1),x(2),x(3),x(4),x(5),x(6),x(7),x(8),x(9))
(imei,address,categories,tags,parent,place_type,place_name,visit_time,nearby_type)
}}
改写成mapPatitions需要使用一下迭代器:
val data_rdd = df.rdd.repartition(200).mapPartitions(iter => for (x <- iter) yield {
val (imei,address,categories,tags,parent,place_type,place_name,visit_time,nearby_type,stat_date) = (x(0),x(1),x(2),x(3),x(4),x(5),x(6),x(7),x(8),x(9))
(imei,address,categories,tags,parent,place_type,place_name,visit_time,nearby_type)
})
zip和zipPartitions
zip
def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]
zip函数用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。
scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at :21
scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at :21
scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at makeRDD at :21
scala> rdd1.zip(rdd2).collect
res0: Array[(Int, String)] = Array((1,A), (2,B), (3,C), (4,D), (5,E))
scala> rdd2.zip(rdd1).collect
res1: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (D,4), (E,5))
scala> var rdd3 = sc.makeRDD(Seq("A","B","C","D","E"),3)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at makeRDD at :21
scala> rdd1.zip(rdd3).collect
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions
//如果两个RDD分区数不同,则抛出异常
zipPartitions
zipPartitions函数将多个RDD按照partition组合成为新的RDD,该函数需要组合的RDD具有相同的分区数,但对于每个分区内的元素数量没有要求。
该函数有好几种实现,可分为三类:
参数是一个RDD
def zipPartitions[B, V](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]
def zipPartitions[B, V](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]
这两个区别就是参数preservesPartitioning,是否保留父RDD的partitioner分区信息
映射方法f参数为两个RDD的迭代器。
scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at makeRDD at :21
scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[23] at makeRDD at :21
//rdd1两个分区中元素分布:
scala> rdd1.mapPartitionsWithIndex{
| (x,iter) => {
| var result = List[String]()
| while(iter.hasNext){
| result ::= ("part_" + x + "|" + iter.next())
| }
| result.iterator
|
| }
| }.collect
res17: Array[String] = Array(part_0|2, part_0|1, part_1|5, part_1|4, part_1|3)
//rdd2两个分区中元素分布
scala> rdd2.mapPartitionsWithIndex{
| (x,iter) => {
| var result = List[String]()
| while(iter.hasNext){
| result ::= ("part_" + x + "|" + iter.next())
| }
| result.iterator
|
| }
| }.collect
res18: Array[String] = Array(part_0|B, part_0|A, part_1|E, part_1|D, part_1|C)
//rdd1和rdd2做zipPartition
scala> rdd1.zipPartitions(rdd2){
| (rdd1Iter,rdd2Iter) => {
| var result = List[String]()
| while(rdd1Iter.hasNext && rdd2Iter.hasNext) {
| result::=(rdd1Iter.next() + "_" + rdd2Iter.next())
| }
| result.iterator
| }
| }.collect
res19: Array[String] = Array(2_B, 1_A, 5_E, 4_D, 3_C)
参数是两个RDD
def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]
def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]
用法同上面,只不过该函数参数为两个RDD,映射方法f输入参数为两个RDD的迭代器。
scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at makeRDD at :21
scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at makeRDD at :21
scala> var rdd3 = sc.makeRDD(Seq("a","b","c","d","e"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[29] at makeRDD at :21
//rdd3中个分区元素分布
scala> rdd3.mapPartitionsWithIndex{
| (x,iter) => {
| var result = List[String]()
| while(iter.hasNext){
| result ::= ("part_" + x + "|" + iter.next())
| }
| result.iterator
|
| }
| }.collect
res21: Array[String] = Array(part_0|b, part_0|a, part_1|e, part_1|d, part_1|c)
//三个RDD做zipPartitions
scala> var rdd4 = rdd1.zipPartitions(rdd2,rdd3){
| (rdd1Iter,rdd2Iter,rdd3Iter) => {
| var result = List[String]()
| while(rdd1Iter.hasNext && rdd2Iter.hasNext && rdd3Iter.hasNext) {
| result::=(rdd1Iter.next() + "_" + rdd2Iter.next() + "_" + rdd3Iter.next())
| }
| result.iterator
| }
| }
rdd4: org.apache.spark.rdd.RDD[String] = ZippedPartitionsRDD3[33] at zipPartitions at :27
scala> rdd4.collect
res23: Array[String] = Array(2_B_b, 1_A_a, 5_E_e, 4_D_d, 3_C_c)
参数是三个RDD
def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]
def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]
用法同上面,只不过这里又多了个一个RDD而已。
zipWithIndex、zipWithUniqueId
zipWithIndex
def zipWithIndex(): RDD[(T, Long)]
该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。
scala> var rdd2 = sc.makeRDD(Seq("A","B","R","D","F"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[34] at makeRDD at :21
scala> rdd2.zipWithIndex().collect
res27: Array[(String, Long)] = Array((A,0), (B,1), (R,2), (D,3), (F,4))
zipWithUniqueId
def zipWithUniqueId(): RDD[(T, Long)]
该函数将RDD中元素和一个唯一ID组合成键/值对,该唯一ID生成算法如下:
每个分区中第一个元素的唯一ID值为:该分区索引号,
每个分区中第N个元素的唯一ID值为:(前一个元素的唯一ID值) + (该RDD总的分区数)
看下面的例子:
scala> var rdd1 = sc.makeRDD(Seq("A","B","C","D","E","F"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[44] at makeRDD at :21
//rdd1有两个分区,
scala> rdd1.zipWithUniqueId().collect
res32: Array[(String, Long)] = Array((A,0), (B,2), (C,4), (D,1), (E,3), (F,5))
//总分区数为2
//第一个分区第一个元素ID为0,第二个分区第一个元素ID为1
//第一个分区第二个元素ID为0+2=2,第一个分区第三个元素ID为2+2=4
//第二个分区第二个元素ID为1+2=3,第二个分区第三个元素ID为3+2=5
其他常用转化操作
1.对一个数据为{1,2,3,3}的RDD进行基本的RDD转化操作
函数名 | 目的 | 示例 | 结果 |
---|---|---|---|
map() | 函数应用于rdd中每个元素 | rdd.map(x => x+1) | {2,3,4,4} |
flatMap() | 将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD。通常用来切分单词,执行扁平化操作。 | rdd.flatMap(x => x.to(3)) | {1,2,3,2,3,3,3} |
filter() | 返回一个由通过传给filter()的函数的元素组成的RDD | rdd.filter(x => x!=1) | {2,3,3} |
distinct() | 去重 | rdd.distinct() | {1,2,3} |
sample(withReplacement,fraction,[seed]) | 对RDD采样,以及是否替换,抽取比例等等 | rdd.sample(false,0.5) | {1,2,3} 非确定的 |
2.对数据分别为{1,2,3}和{3,4,5}的RDD进行针对两个RDD的转化操作
函数名 | 目的 | 示例 | 结果 |
---|---|---|---|
union() | 生成一个包含两个RDD中所有元素的RDD (不去重的并集) | rdd.union(other) | {1,2,3,3,4,5} |
intersection() | 求两个RDD共同的元素的RDD (交集) | rdd.intersection(other) | {3} |
subtract() | 求移除一个RDD中的内容 (差集) | rdd.subtract(other) | {1,2} |
cartesian() | 笛卡尔积 | rdd.cartesian(other) | {(1,3),(1,4),...,(3,5)} |
2.2.行动操作:
函数名 | 目的 | 示例 | 结果 |
---|---|---|---|
collect() | 返回RDD中的所有元素 | rdd.collect() | {1,2,3,3} |
count() | RDD中元素的个数 | rdd.count() | 4 |
countByValue() | 各元素在RDD中出现的次数 | rdd.countByValue() | {(1,1),(2,1),(3,2)} |
take(num) | 从RDD中返回num个元素 | rdd.take(2) | {1,2} |
top(num) | 从RDD中返回最前面的num个元素 | rdd.top(2) | {3,3} |
takeOrdered(num) | 从RDD中按照提供的顺序返回最前面的num个元素 | rdd.takeOrdered(2)(myOrdering) | {3,3} |
takeSampe(withReplacement,num,[seed]) | 从RDD中返回任意一些元素 | rdd.takeSample(false,1) | 非确定的 |
reduce(func) | 并行整合RDD中所有数据 | rdd.reduce((x,y) => x+y) | 9 |
fold(zero)(func) | 和reduce类似,但是需要提供初始值 | rdd.fold(0)((x,y) => x+y) | 9 |
aggregate(zeroValue)(seqOp,comOp) | 和reduce相似,但是通常返回不同类型的函数 | rdd.aggregate((0,0)) ((x,y)=> (x._1+x,x._2+1) ,(x,y) => (x._1+y._1,x._2+y._2)) | (9,4) |
foreach(func) | 对RDD中的每个元素使用给定的函数 | rdd.foreach(func) | 无 //和map进行对比,map也是对RDD中的每个元素进行操作,但是允许有返回值 |
first操作
返回RDD中第一个元素
collect操作
以集合形式返回RDD的元素(常用语小的数据集)
比如某个文件数据
中国
美国
加拿大
……
读取的时候希望把文件中的元素放到列表里
import scala.collection.mutable.ArrayBuffer
val stat_array = new ArrayBuffer[String]()
rdd.collect.foreach(v => (stat_array += v))
take操作
take(num:Int):将RDD作为集合,返回集合中[0,num-1]下标的元素
reduce操作
reduce(f:(T,T)=>T):对RDD中元素进行二元计算,返回计算结果
top操作
top(num:Int):按照默认的或者是指定的排序规则,返回前num个元素
takeOrdered操作
takeOrdered(num:Int):和top相反,返回前num个元素
foreach和map的区别:
foreach是action操作,它没有返回值,通常用于print或者写入文件等操作。
map是transform操作(lazy),生成一个新的rdd
sortBy函数(在org.apache.spark.rdd.RDD类中)
共三个参数:
第一个参数是一个函数,该函数的也有一个带T泛型的参数,返回类型和RDD中元素的类型是一致的;
第二个参数是ascending,该参数决定排序后RDD中的元素是升序还是降序,默认是true,也就是升序;
第三个参数是numPartitions,该参数决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的个数相等
例如:
data: (item_num, item_id, item_name)
val data_sorted = data.sortBy(_._1, ascending = false)
根据item_num从大到小排列(降序)。
2.3.键值对操作
2.3.1.partitionBy,mapValues,flatMapValues
partitionBy,mapValues,flatMapValues和基本转换操作中的repatition,map和flatMap功能类似。
partitionBy接口根据partitioner函数生成新的ShuffledRDD,将原RDD重新分区(在repartition中也是先将RDD[T]转化成RDD[K,V],这里V是null,然后使用RDD[K,V]作为函数生成ShuffledRDD)。mapValues和flatMapValues针对[K,V]中的V值进行map操作和flatMap操作。
使用partitionBy,mapValues,flatMapValues不会破坏原数据的partition的结构&信息,使用repatition,map和flatMap后还需要做Shuffle,数据就不带有原先的partition信息。所以键值对操作尽量使用partitionBy,mapValues,flatMapValues,少用repatition,map和flatMap。
2.3.2.combineByKey,foldByKey,reduceBykey,groupByKey
四种键值对转换操作都是针对RDD[K,V]本身,不涉及与其他RDD的组合操作,四种操作类型最终都会归结为堆combinByKey的调用。combineByKey接口是将RDD[K,V]转化成返回类型RDD[K,C],这里V类型与C类型可以相同也可以不同。
groupbykey效率很低,尽量使用reducebykey和combinebykey来代替groupbykey
reduceByKey函数
reduceByKey(_+_) 相当于 reduceByKey((a,b)=>(a+b))
reduceByKey(_++_) 相当于 reduceByKey((a,b)=>(a++b))
这里a和b 是一个list,++用于合并列表
combineByKey函数
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
其中的参数:
createCombiner:组合器函数,用于将V类型转换成C类型,输入参数为RDD[K,V]中的V,输出为C
mergeValue:合并值函数,将一个C类型和一个V类型值合并成一个C类型,输入参数为(C,V),输出为C
mergeCombiners:合并组合器函数,用于将两个C类型值合并成一个C类型,输入参数为(C,C),输出为C
numPartitions:结果RDD分区数,默认保持原有的分区数
partitioner:分区函数,默认为HashPartitioner
mapSideCombine:是否需要在Map端进行combine操作,类似于MapReduce中的combine,默认为true
举例理解:
假设我们要将一堆的各类水果给榨果汁,并且要求果汁只能是纯的,不能有其他品种的水果。那么我们需要一下几步:
1 定义我们需要什么样的果汁。
2 定义一个榨果汁机,即给定水果,就能给出我们定义的果汁。--相当于hadoop中的local combiner
3 定义一个果汁混合器,即能将相同类型的水果果汁给混合起来。--相当于全局进行combiner
那么对比上述三步,combineByKey的三个函数也就是这三个功能
1 createCombiner就是定义了v如何转换为c
2 mergeValue 就是定义了如何给定一个V将其与原来的C合并成新的C
3 就是定义了如何将相同key下的C给合并成一个C
var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))
rdd1.combineByKey(
(v : Int) => List(v), --将1 转换成 list(1)
(c : List[Int], v : Int) => v :: c, --将list(1)和2进行组合从而转换成list(1,2)
(c1 : List[Int], c2 : List[Int]) => c1 ::: c2 --将全局相同的key的value进行组合
).collect
res65: Array[(String, List[Int])] = Array((A,List(2, 1)), (B,List(2, 1)), (C,List(1)))
简要介绍
def combineByKey[C](createCombiner: (V) => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RD
createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就
和之前的某个元素的键相同。如果这是一个新的元素, combineByKey() 会使用一个叫作 createCombiner() 的函数来创建
那个键对应的累加器的初始值
mergeValue: 如果这是一个在处理当前分区之前已经遇到的键, 它会使用 mergeValue() 方法将该键的累加器对应的当前值与这个新的值进行合并
mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更
多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各
个分区的结果进行合并。
创建一个学生成绩说明的类
case class ScoreDetail(studentName: String, subject: String, score: Float)
下面是一些测试数据,加载测试数据集合 key = Students name and value = ScoreDetail instance
val scores = List(
ScoreDetail("xiaoming", "Math", 98),
ScoreDetail("xiaoming", "English", 88),
ScoreDetail("wangwu", "Math", 75),
ScoreDetail("wangwu", "English", 78),
ScoreDetail("lihua", "Math", 90),
ScoreDetail("lihua", "English", 80),
ScoreDetail("zhangsan", "Math", 91),
ScoreDetail("zhangsan", "English", 80))
换成二元组, 也可以理解成转换成一个map, 利用了for 和 yield的组合
val scoresWithKey = for { i <- scores } yield (i.studentName, i)
val scoresWithKeyRDD = sc.parallelize(scoresWithKey).partitionBy(new org.apache.spark.HashPartitioner(3)).cache
聚合求平均值让后打印
val avgScoresRDD = scoresWithKeyRDD.combineByKey(
(x: ScoreDetail) => (x.score, 1) /*createCombiner*/,
(acc: (Float, Int), x: ScoreDetail) => (acc._1 + x.score, acc._2 + 1) /*mergeValue*/,
(acc1: (Float, Int), acc2: (Float, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) /*mergeCombiners*/
// calculate the average
).map( { case(key, value) => (key, value._1/value._2) })
avgScoresRDD.collect.foreach(println)
/*输出:
(zhangsan,85.5)
(lihua,85.0)
(xiaoming,93.0)
(wangwu,76.5)
*/
解释一下scoresWithKeyRDD.combineByKey
createCombiner: (x: ScoreDetail) => (x.score, 1)
这是第一次遇到zhangsan,创建一个函数,把map中的value转成另外一个类型 ,这里是把(zhangsan,(ScoreDetail类))转换成(zhangsan,(91,1))
mergeValue: (acc: (Float, Int), x: ScoreDetail) => (acc._1 + x.score, acc._2 + 1) 再次碰到张三, 就把这两个合并, 这里是将(zhangsan,(91,1)) 这种类型 和 (zhangsan,(ScoreDetail类))这种类型合并,合并成了(zhangsan,(171,2))
mergeCombiners (acc1: (Float, Int), acc2: (Float, Int)) 这个是将多个分区中的zhangsan的数据进行合并, 我们这里zhansan在同一个分区,这个地方就没有用上
contRdd.combineByKey(
(score:(String,Long)) => Map(score._1 -> score._2),
(c:Map[String,Long],score) => (c ++ Map(score._1 -> score._2)),
(c1:Map[String,Long],c2:Map[String,Long]) => (c1 ++ c2) )
topk问题:
https://www.twblogs.net/a/5c602891bd9eee06ef371458/zh-cn
2.4 控制操作
2.4.1. cache,persist
cache底层是调用了persist。
2.4.2. checkpoint
3 例子:
3.1 join或者其他消耗较大的处理前先进行聚合操作
join通常是你在使用Spark时最昂贵的操作,需要在join之前应尽可能的先缩小你的数据。
假设,你有一个RDD存着(熊猫id,分数),另外一个RDD存着(熊猫id,邮箱地址)。若你想给每只可爱的熊猫的邮箱发送她所得的最高的分数,你可以将RDD根据id进行join,然后计算最高的分数,如下:
def joinScoresWithAddress1( scoreRDD : RDD[(Long, Double)],
addressRDD : RDD[(Long, String )]) : RDD[(Long, (Double, String))]= {
val joinedRDD = scoreRDD.join(addressRDD)
joinedRDD.reduceByKey( (x, y) => if(x._1 > y._1) x else y )
}
然而,这可能不会比先减少分数数据的方案快。先计算最高的分数,那么每个熊猫的分数数据就只有一行,接下来再join地址数据:
def joinScoresWithAddress2( scoreRDD : RDD[(Long, Double)],
addressRDD : RDD[(Long, String )]) : RDD[(Long, (Double, String))]= {
val bestScoreData = scoreRDD.reduceByKey((x, y) => if(x > y) x else y)
bestScoreData.join(addressRDD)
}
若每个熊猫有1000个不同的分数,那么这种做法的shuffle量就比上一种的小了1000倍。
Reference:
https://blog.csdn.net/weixin_42181200/article/details/81201864
http://lxw1234.com/archives/2015/07/350.htm
【Spark快速大数据分析】
【Spark SQL入门与实践指南】