Spark RDD Transformations
value类型
map
返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
val arr = Array(1,2,3,4,5,6)
val numRDD = sc.parallelize(arr)
val resultRDD = numRDD.map(x => "(" + x*x + ")")
resultRDD.foreach(print)
//输出结果
(1)(4)(9)(16)(25)(36)
mapPartition
类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] =>Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区。
val arrayRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9))
arrayRDD.mapPartitions(elements => {
val result = new ArrayBuffer[Int]
elements.foreach(e => {
result += e
})
result.iterator
}).foreach(println)
}
//输出结果
1
2
3
4
5
6
7
8
9
mapPartitionsWithIndex
//第二个参数指定分区数量
val arrayRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),3)
//方法中有一个Int类型参数,指分区号;一个迭代器类型,保存分区内所有元素
arrayRDD.mapPartitionsWithIndex((index,elements) => {
println("partition index:" + index)
val result = new ArrayBuffer[Int]
elements.foreach(e => {
result += e
})
result.iterator
}).foreach(println)
输出结果
partition index:0
1
2
3
partition index:1
4
5
6
partition index:2
7
8
9
flatMap
flatMap根据切分的规则将集合内各元素切割,压平成无数个新的元素保存在一个集合中。
val words = Array("hello python","hello hadoop","hello spark")
val wordRDD = sc.parallelize(words)
wordRDD.flatMap(_.split(" ")).collect.foreach(println)
//输出结果
hello
python
hello
hadoop
hello
spark
map和mapPartition()的区别
- map每次处理一条数据
- mapPartition每次处理一个分区的数据,这个分区的数据处理完后,元RDD中分区的数据才能释放,可能导致OOM(内存溢出)
- 当内存空间较大的时候使用mapPartition,可以提高处理效率
filter
将集合按照指定的规则过滤,将符合过滤规则的元素返回到新的集合中。
val arr = Array(1,2,3,4,5,6)
val numRDD = sc.parallelize(arr)
val resultRDD = numRDD.filter(_%2==0)
resultRDD.foreach(println)
//输出结果
2
4
6
glom
每个分区对应一个数组,数组保存该分区中的所有元素,形成新的RDD类型RDD[Array[T]]
val arrayRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),3)
arrayRDD.glom().collect.foreach(println)
//输出结果
[I@340b7ef6
[I@30404dba
[I@6050462a
可以看到三个数组对应的地址值。
groupBy 分组
分组,条件中的返回值作为key,将key值相同的元素放在同一个组里
返回值是k,v形式,其中V是一个集合的形式
val arrayRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),3)
arrayRDD.groupBy(x => x%2).collect.foreach(println)
//输出结果
(0,CompactBuffer(2, 4, 6, 8))
(1,CompactBuffer(1, 3, 5, 7, 9))
sample(withReplacement,fraction, seed) 抽样
以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子。
当选择不放回抽样时,fraction参数值必须在[0,1]之间。假如数据集中有100个元素,当fraction值为0.1时,抽样的个数大概
会在10个
当选择放回抽样时,fraction参数值表示的意思是数据集中每个元素期望被抽出来的次数(放回抽样的底层原理是泊松分布和伯努利分布),具体可以参考一下源码。
seed参数可以不指定,如果要指定建议使用System.currentTimeMillis(当前时间戳)。
val arrayRDD = sc.parallelize(1 to 10)
arrayRDD.sample(false,0.2).foreach(println)
//输出结果
2
5
8
distinct去重
val arrayRDD = sc.parallelize(Array(1,1,1,2,2,2,3,3,3,4,4,4))
val resultRDD = arrayRDD.distinct.collect
println(resultRDD.mkString(","))
//输出结果
2
5
8
coalesce缩减分区数
窄依赖,可以选择是否经过shuffle,默认不经过shuffle阶段,若指定经过shuffle阶段则功能和repartition类似
作用:经过大数据集的过滤后,减少分区数,提高小数据集效率
//初始化RDD,设置四个分区
val arrRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8),4)
val coaRDD = arrRDD.coalesce(2)
val resultRDD = coaRDD.mapPartitionsWithIndex((index,value) => value.map((index,_)))
resultRDD.collect.foreach(print)
//输出结果
(0,1)(0,2)(0,3)(0,4)(1,5)(1,6)(1,7)(1,8)
将分区减少为两个,可以看出结果未经过shuffle阶段,将数据集前四个元素放到第一个分区,将后四个元素放到第二个分区。
试着将缩减分区数改为10(大于原分区数)
val arrRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8),4)
val coaRDD = arrRDD.coalesce(10)
val resultRDD = coaRDD.mapPartitionsWithIndex((index,value) => value.map((index,_)))
resultRDD.collect.foreach(print)
//输出结果
(0,1)(0,2)(1,3)(1,4)(2,5)(2,6)(3,7)(3,8)
可以看到分区数还是四个,说明coalesce在默认情况下无法扩展分区数
repartition 重分区
宽依赖,经过shuffle阶段,可以扩大或者缩减分区数
val arrRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8),4)
val repRDD = arrRDD.repartition(2)
val resultRDD = repRDD.mapPartitionsWithIndex((index,value) => value.map((index,_)))
resultRDD.collect.foreach(print)
//输出结果
(0,1)(0,4)(0,5)(0,7)(1,2)(1,3)(1,6)(1,8)
将分区数指定为两个,可以看到输出结果中元素没有按照原有顺序写入分区,说那个repartition重分区经过了shuffle阶段。
将分区数改为6,再看输出结果。
(0,4)(1,1)(2,2)(2,7)(3,8)(4,5)(5,3)(5,6)
分区数变成六个。
coalesce和repartition的区别
下面是repartition的源码,可以看出repartiton底层是调用coalesce完成的,并且指定开启shuffle。
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
sortBy(func,[ascending],[numTasks])
- sortBy根据指定的func返回值进行排序
- ascending是布尔型变量。默认为true,表示升序排序。false为降序排序
- 最后一个 参数用来指定分区数量
val arrRDD = sc.parallelize(Array(2,1,6,3,7,5,8,4))arrRDD.sortBy(x => x).collect.foreach(print)
//输出结果
12345678
pipe
管道,针对每个分区都执行一遍shell脚本,返回输出的RDD
脚本需要放在Worker节点能找到的位置
双Value类型交互
union
可以将两个rdd中的元素合并
val rdd1 = sc.parallelize(Array(1,2,3,5,7))
val rdd2 = sc.parallelize(Array(2,4,5,6,8))
val resultRDD = rdd1.union(rdd2).collect.foreach(print)
//输出结果
1235724568
subtract
求两个rdd的差集。
差集:rdd1 - (rdd1 ∩ rdd2)
val rdd1 = sc.parallelize(Array(1,2,3,5,7))
val rdd2 = sc.parallelize(Array(2,4,5,6,8))
val resultRDD = rdd1.subtract(rdd2).collect.foreach(println)
//输出结果
1
3
7
intersection
求交集
val rdd1 = sc.parallelize(Array(1,2,3,5,7))
val rdd2 = sc.parallelize(Array(2,4,5,6,8))
val resultRDD = rdd1.intersection(rdd2).collect.foreach(println)
//输出结果
5
2
cartesian
笛卡尔积
val rdd1 = sc.parallelize(Array(1,2,3,5,7))
val rdd2 = sc.parallelize(Array(2,4,5,6,8))
val resultRDD = rdd1.cartesian(rdd2).collect.foreach(print)
//输出结果
(1,2)(1,4)(1,5)(1,6)(1,8)(2,2)(2,4)(2,5)(2,6)(2,8)(3,2)(3,4)(3,5)(3,6)(3,8)(5,2)(5,4)(5,5)(5,6)(5,8)(7,2)(7,4)(7,5)(7,6)(7,8)
zip
将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。
val rdd1 = sc.parallelize(Array(1,2,3,5,7))
val rdd2 = sc.parallelize(Array(2,4,5,6,8))
val resultRDD = rdd1.zip(rdd2).collect.foreach(print)
//输出结果
(1,2)(2,4)(3,5)(5,6)(7,8)
KeyValue类型
partitonBy
与repartition的区别:partitonBy可以修改分区器与分区个数,例如:哈希分区。
val rdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd")),4)
println(rdd.partitionBy(new HashPartitioner(2)).partitions.size)
输出结果:
2
reduceByKey
将相同的key值聚合到一起,value自定义操作
val rdd = sc.parallelize(Array(("amy",1),("Bob",1),("Cindy",1),("amy",1),("amy",1),("Bob",1)))
val reduceRDD = rdd.reduceByKey((x,y) => x+y )
reduceRDD.collect.foreach(println)
结果:
(amy,3)
(Cindy,1)
(Bob,2)
groupByKey(K,Iterable)
按照key值进行分组,将value值放到迭代器中。
val rdd = sc.parallelize(Array(("amy",1),("Bob",1),("Cindy",1),("amy",1),("amy",1),("Bob",1)))
val reduceRDD = rdd.groupByKey().collect.foreach(println)
结果:
(amy,CompactBuffer(1, 1, 1))
(Cindy,CompactBuffer(1))
(Bob,CompactBuffer(1, 1))
reduceByKey和groupByKey的区别
reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v].
groupByKey:按照key进行分组,直接进行shuffle。
开发指导:reduceByKey比groupByKey,建议使用。但是需要注意是否会影响业务逻辑。
aggregateBybyKey
作用:在kv对的RDD中,,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。
参数描述:
zeroValue:给每一个分区中的每一个key一个初始值;
seqOp:函数用于在每一个分区中用初始值逐步迭代value;
combOp:函数用于合并每个分区中的结果。
源码:
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
}
从源码中可以看出,在各个分区里面zeroValue的U值传入啊seqOp与原始数据集的V进行运算,将运算结果作为U再次传入seqOp。
而各个分区互相之间在利用combOp函数进行聚合操作,最终可得到结果。
val data = List((1, 3), (1, 4), (2, 3), (3, 6), (1, 2), (3, 8))
val rdd = sc.parallelize(data, 3)
rdd.aggregateByKey(0)(math.max(_,_), _ + _).collect.foreach(println)
结果:
(3,14)
(1,6)
(2,3)
foldByKey
处于reduceByKey与aggregateByKey之间,可以指定初始值,但是无法将分区内和分区之间的两个操作自定义执行。
val data = List(("amy",1),("amy",1), ("Bob",1),("Bob",1), ("Bob",1), ("Cindy",1))
val rdd = sc.parallelize(data, 2)
rdd.foldByKey(0)(_+_).collect.foreach(println)
结果:
(amy,2)
(Cindy,1)
(Bob,3)
combineByKey
对于相同的key,将value合并并聚合
//源码
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
}
从源码中可以看出函数执行分为三个阶段,详见下方案例。
案例:求相同key对应的所有value的平均值
val result = rdd.combineByKey(
//1.createCombiner,将原数据集中读取的value值
//根据需求转换为指定的格式,这里转换为一个元组
(_, 1),
//2.mergeValue,将读取进来的相同key下的value做累加
//求平均值,所以第一个值用来求和,
//第二个值用来记录总数,
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
//3.mergeCombiners,将不同分区之间的数据累加
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).collect
/**
* rdd的输出结果,第一个值为和,第二个值为总数
* (b,(74,3))
* (a,(35,2))
* (c,(26,1))
*/
result.map(x => (x._1,x._2._1 / x._2._2)).foreach(println)
结果:
(b,24)
(a,17)
(c,26)
sortByKey
根据key值进行排序,其中key值必须为可排序的类型。排序结束返回一个RDD。
val data = List(("b", 26),("a", 10), ("c", 26), ("a", 25), ("b", 15) , ("b", 33))
val rdd = sc.parallelize(data, 2)
val result = rdd.sortByKey().collect.foreach(println)
结果:
(a,10)
(a,25)
(b,26)
(b,15)
(b,33)
(c,26)
mapValues
与map的功能类似,不过mapValues是针对键值对的数据类型且对其中的value进行操作。
val data = List(("b", 1),("a", 1), ("c", 1), ("a", 1), ("b", 1) , ("b", 1))
val rdd = sc.parallelize(data, 2)
val result = rdd.mapValues(x => x+1).collect.foreach(println)
结果:
(b,2)
(a,2)
(c,2)
(a,2)
(b,2)
(b,2)
join
当两个RDD的key值相同时,使用join将两个RDD的value聚合,返回结果为(key,(value1,value2))
val data1 = List(("a",1),("b",1),("c",1))
val data2 = List(("a",2),("b",2),("c",2))
val rdd1 = sc.parallelize(data1)
val rdd2 = sc.parallelize(data2)
rdd1.join(rdd2).collect.foreach(x => print(x + ";"))
结果:
(a,(1,2));(b,(1,2));(c,(1,2));
cogroup
与join类似,输出格式为(K,(Iterable<V>,Iterable<W>)),指将两个rdd相同key下的value放到相对应的迭代器中。
val data1 = List(("a",1),("b",1),("c",1))
val data2 = List(("a",2),("a",3),("b",2),("c",2))
val rdd1 = sc.parallelize(data1)
val rdd2 = sc.parallelize(data2)
rdd1.cogroup(rdd2).collect.foreach(println)
结果:
(a,(CompactBuffer(1),CompactBuffer(2, 3)))
(b,(CompactBuffer(1),CompactBuffer(2)))
(c,(CompactBuffer(1),CompactBuffer(2)))