1. Value 型
(1) 输入、输出分区 一对一
-
map
// 将数据逐个迭代,生成新的值或键值对 val rdd1 = sc.parallelize(1 to 3, 3) rdd1.map(x => (x, 1))
-
flatMap
// 合并每个分区中的元素,生成一个新的集合 val rdd1 = sc.parallelize(1 to 3, 3) rdd1.flatMap(x => x) // => Array( 1, 2, 3 )
-
mapPartitions
// 将每个分区进行一次map val rdd1 = spark.sparkContext.parallelize(1 to 9, 3) rdd1.mapPartitions{iter => val list = ListBuffer[Int]() while(iter.hasNext){ val item = iter.next() list.append(item * 2) } list.toIterator // 返回迭代器 }.collect().foreach(println)
-
glom
// 分区 => 数组 val rdd1 = sc.parallelize(1 to 3, 3) rdd1.glom.collect // => Array( Array(1), Array(2), Array(3) )
(2) 输入、输出分区 多对一
-
union:并集(不去重)
// 合并多个 RDD rdd1.union(rdd2) // rdd1 + rdd2
-
intersection:交集(去重)
rdd1.intersection(rdd2) // rdd1 * rdd2
-
subtract:差集
rdd1.subtract(rdd2) // rdd1 - rdd2
-
cartesian:笛卡尔积
// 对 RDD 进行笛卡尔积计算 rdd1.cartesian(rdd2)
(3) 输入、输出分区 多对多
-
groupBy
// 分组 val rdd1 = sc.parallelize(1 to 9, 3) rdd1.groupBy(x => { if(x % 2 == 0) "偶数" else "奇数" })
(4) 输出为输入的子集
-
filter
// 按一定条件过滤数据 val rdd1 = sc.parallelize(1 to 9, 3) rdd1.filter(x => x % 2 == 0)
-
distinct
// 去重 rdd1.distinct
-
sample
// 采样 val rdd1 = sc.parallelize(1 to 100000, 3) /** * 参数 * (1) withReplacement: Boolean 是否有放回采样 * (2) fraction: Double 采样比例 * (3) seed: Long 种子 */ rdd1.sample(false, 0.1, 0L)
-
takeSample
// takeSample 与 sample 不同的是:不使用相对比例采样,而是按设定的个数进行采样 rdd1.takeSample(true, 20, 0L)
- Cache型
- cache
- persist
2. Key-Value 型
(1) 输入、输出分区一对一
-
mapValues
// 对 value 进行操作 val rdd1 = sc.parallelize(List("dog","cat","tiger","lion","mouse"), 2) val rdd2 = rdd1.map(x => (x.length, x)) // Array( (3, dog), (3, cat), ... ) rdd2.mapValues(x => "#" + x + "#") // Array( (3, #dog#), (3, #cat#), ... ) // 只有一个参数时,可以使用通配符 "_" rdd2.mapValues("#" + _ + "#")
-
zip
// 合并两个 RDD 为键值对 // 注意分区数、元素数量 必须相同 val rdd1 = sc.parallelize(List("dog","cat","tiger","lion","mouse"), 2) val rdd2 = sc.parallelize(List(1,1,2,3,3), 2) val rdd3 = rdd2.zip(rdd1) // Array((1,dog), (1,cat), (2,tiger), (3,lion), (3,mouse))
(2) 对单个 RDD 聚集
-
combineByKey
val rdd1 = sc.parallelize(List("dog","cat","tiger","lion","mouse"), 2) val rdd2 = sc.parallelize(List(1,1,2,3,3), 2) val rdd3 = rdd2.zip(rdd1) // Array((1,dog), (1,cat), (2,tiger), (3,lion), (3,mouse)) // 根据 Key 合并 val rdd4 = rdd3.combineByKey( List(_), // createCombiner (x: List[String], y: String) => y::x, // mergeValue (x: List[String], y: List[String]) => x:::y // mergeCombiners ) // Array((3,List(rabbit, cat)), (5,List(bee)), (2,List(dog, bear)))
-
reduceByKey
val rdd1 = sc.parallelize(List("dog","cat","tiger","dog","mouse"), 2) val rdd2 = sc.parallelize(List(1,1,2,3,3), 2) val rdd3 = rdd1.zip(rdd2) // Array((dog,1), (cat,1), (tiger,2), (dog,3), (mouse,3)) // 根据 Key 相减 val rdd4 = rdd3.reduceByKey(_+_) // Array((dog,4), (cat,1), (tiger,2), (mouse,3))
- partitionByKey
(3) 对两个 RDD 聚集
Cogroup
-
连接
- join
- leftOutJoin
- rightOutJoin
3. Action 算子
- 无返回值
- foreach
- HDFS 输出
- saveAsTextFile
- saveAsObjectFile
- 集合、数据类型
- collect
- collectAsMap
- reduceByKeyLocally
- lookup
- count
- top
- reduce
- fold
- aggregate
4. 数据加载算子
- 文件读取
- textFile
- 内存生成
- makeRDD
- parallelize