一、单个RDD的操作
1、map、mapPartition、mapPartitionsWithIndex
map:以一条记录为单位进行操作,返回一条数据
map((_,1))
map(x =>{
返回值:单个值或者键值对
})
mapPartition:以分区为单位进行操作
mapPartitionsWithIndex:以分区为单位进行操作,与mapPartition不同的是带下标
2、flatMap:输入一条数据,返回多条数据
比map多了一个flatten过程
Val arr=Array(“hello hadoop”,”hello hive”,”hello spark ”)
Val map=arr.map(_.split(“ ”)// Array(Array(hello,hadoop),Array(hello,hive),Array(hello,spark))
Map.flatten //Array(hello,hadoop,hello,hive,hello,spark)
Arr.flatMap(_.split(“ ”)) // Array(hello,hadoop,hello,hive,hello,spark)
3、filter:过滤器
rdd.filter(_.trim.length>0)
4、sample(withReplacement:boolean(是否放回抽取了的元素,true放回,false不放回),franction:Double(抽取比例),seed:Long(抽样算法的随机数种子,默认为随机数,可以省略)):随机抽样
5、union、intersection
union:把两个RDD进行逻辑上的合并
intersection:求两个RDD的交集
val rdd1=sc.makeRDD(1 to 3)
val rdd2=sc.parallelize(4 until 6)
rdd1.union(rdd2)
6、sortBy和sortByKey
sortBy:手动指定排序的字段,第一个参数(_.1或者._2之类的),第二个参数可省略,表示升序,加false表示降序
sortByKey:按key进行排序,加false表示降序
7、groupByKey和reduceByKey
groupByKey:按相同的key进行分组(key,CompactBuffer(1,1,1,1))
reduceByKey:按相同的key进行分组,并且对value进行操作,假如是相加操作,reduceByKey(+)结果就是(key,4)
8、distinct:去掉重复的数据
9、coalesce、repartition、partitionBy分区函数
coalesce:有两个参数,第一个是分几个区,第二个参数是决定是否shuffle。默认为false,不可省略
1)、如果重分区的数量大于原来的数量,如果不shuffle,也就是第二个参数不为true,分区数不会发生变化,如果设置为true,增加分区会把原来的分区的数据随机分配给设置的分区中
2)、如果重分区的数量小于原来的数量,如过不shuffle,原来的分区里面的数据会整块整块的分到新的分区,如果shuffle,则会将原来的数据打乱按一定的规则重新分配给设置的分区
repartition:将数据集经过shuffle重新分为几个区,相当于coalesce(int n,true)
partitionBy:自定义分区器,重新分区
自定义分区器:
Calss MyPartition extends Partitioner{
//分区数量为多少
Override def numPartitions:Int=2
//自定义分区规则
Override def getPartition(key:Any):Int={
If(key.HashCode()%2==0){
0
}else{
1
}
}
}
在main函数中调用
partitionBy(new MyPartition)
Scala也有一个自有的封装的HashPartition(int n),按照hashcode值分为几个区
10、repartitionAndSortWithPartitions:重新分区并按照分区排序
Rdd. repartitionAndSortWithPartitions(new HashPartition(4))
11、glom:把分区中的元素封装到数组中
12、randomSplit:拆分RDD,将原有的RDD按照一定的比例拆分为多个分区,在数量不大的情况下,实际结果不一定准确
Val rdd=sc.parallelize(1 to 10)
Rdd. randomSplit(Array(0.1,0.2,0.3,0.4)).foreach(x => {println(x.count)})
理论结果:
1
2
3
4
Array里面的数值是多少无所谓,最后是按几个数字的比例来分配
二、多个RDD的操作
1、join、leftOuterJoin、rightOuterJoin、fullOuterJoin
Join:连接两个RDD
leftOuterJoin:左外连接
rightOuterJoin:右外连接
fullOuterJoin:全外连接
与oracle的内外连接类似
2、cogroup:将多个RDD中同一个key对应的value组合到一起
Val arr=arr1.cogroup(arr2,arr3,arr4)
3、cartesian:求笛卡尔积
假如rdd1有三个元素,rdd2有4个元素,rrd1. Cartesian(rdd2)就有12个元素
4、zip、zipWithIndex、zipWithUniqueId
Zip:两个非k,v格式的RDD,通过一一对应的格式压缩为k,v格式的RDD,要求:
1)、分区数量需要相同
2)、分区中的元素个数相等
rdd1.zip(rdd2)
zipWithIndex:将RDD变成K,V格式的RDD
K:这个RDD的元素
V:这个元素在这个RDD中的索引
rdd. zipWithIndex()
zipWithUniqueId:按hashCode码拉取数据
假如rdd有两个分区,数据分别是1,2,3,4,5和6,7,8,9,10
那么rdd. zipWithUniqueId()的结果是:
(1,0)
(2,2)
(3,4)
(4,6)
(5,8)
和
(6,1)
(7,3)
(8,5)
(9,7)
(10,9)