在对RDDPair(一种特殊的 RDD,即RDD[(key, Row)])进行操作时经常会用到 reduceByKey() 和 groupByKey() 两个算子。下面看看两者的区别和使用方法:
一、reduceByKey(func) 和 groupByKey() 的区别
reduceByKey(func):顾名思义,是针对 RDDPair 中具有
相同 key 的所有 row 做 reduce 操作
,操作内容由函数 func 确定,可以自定义,比如:形如 (0, BACA) 这样的 row,现在需要对 key 相同的所有row(即 BACA)使用"-"拼接成一个长字符串,比如(1,TMWTYV-PYSAJV)
;groupByKey(): 顾名思义,是针对 RDDPair 中具有
相同 key 的所有 row 分组
,相同 key 对应的 row 汇总生成一个sequence;本身不能自定义函数,只能通过额外通过map(func)来实现。比如:(0,CompactBuffer(ZCEXLX, BKSGQD, ICRWVA, PXFBAC, SUBCYR, OMEQVV, TMBPHW))
。
使用reduceByKey()的时候,本地的数据先进行merge
然后再传输到不同节点再进行merge,最终得到最终结果。
而使用groupByKey()的时候,并不进行本地的merge
,全部数据传出,得到全部数据后才会进行聚合成一个sequence,groupByKey()传输速度明显慢于reduceByKey()。
虽然groupByKey().map(func)也能实现reduceByKey(func)功能,但是优先使用reduceByKey(func)
。
区别:
区别项 | reduceByKey | groupByKey | 备注 |
---|---|---|---|
功能 | 针对 RDDPair 中具有相同 key 的所有 row 做 reduce 操作 | 针对 RDDPair 中具有相同 key 的所有 row 分组 | 无 |
能自定义函数 | 可以自定义reduce函数 | 否 | 无 |
输出 | 一个 key 对应一个row | 一个key 对应多个row的sequence | 无 |
性能 | 更高 | 更低 | groupByKey.map(func) 可以实现 reduceByKey,但是尽量用 reduceByKey,因为更高效 |
二、Scala 代码--使用方法
- rddMap.groupByKey(自定义partitioner);
- rddMap.reduceByKey(自定义reduce函数) 或者类似 rddMap.reduceByKey(_ + "-" + ) ,其中 _ + "-" + _ 中的""表示 key 相同的两个 row
import org.apache.spark.Partitioner
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
object TestSparkShuffle {
class MyPartitioner(partitionNum: Int) extends Partitioner() {
override def numPartitions: Int = partitionNum
override def getPartition(key: Any): Int = {
if (key.asInstanceOf[Int] % 2 == 0) {
0
} else {
1
}
}
}
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local").appName("test").getOrCreate()
val sc = spark.sparkContext
val arr = new ArrayBuffer[String]
genStrArr(36, arr)
val rdd = sc.parallelize(arr)
val rddMap: RDD[(Int, String)] = rdd.mapPartitions(
partition => {
partition.map(str => (getKey(str), str))
}
)
rddMap.foreach(x => println(x))
// 按照 key 进行分组,且key为奇、偶数的row各分在0、1分区内
val rddMap2 = rddMap.groupByKey(new MyPartitioner(2))
rddMap2.foreach(x => println(x))
// 对 rddMap 中的row按照row的key,同样的key的value相继使用"-"拼接起来
val rddMap3 = rddMap.reduceByKey(reduceFunc)
// val rddMap3 = rddMap.reduceByKey(_ + "-" + _) // _ + "-" + _ 中的"_"表示 key 相同的两个value
rddMap3.foreach(x => println(x))
println(rddMap.count())
}
// reduce 函数,将两个字符串使用"-"拼接
def reduceFunc(x: String, y : String): String = {
x + "-" + y
}
def getKey(str: String): Int = {
Math.abs(str.hashCode % 6)
}
// 生成size为num的字符串数组,每个字符串长度为6,由A~Z随机构成
def genStrArr(num : Int, arr: ArrayBuffer[String]): Unit = {
val baseChars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
val charLen = 6
val rand = new Random()
for (x <- Range(0, num)) {
var subStr = ""
for (i <- Range(0, charLen)) {
val order = rand.nextInt(baseChars.length)
subStr += baseChars.charAt(order)
}
arr.append(subStr)
}
}
}
测试结果:
# groupByKey 结果
(4,CompactBuffer(HCAESV, OZNIQU, WIIWNX, MEFMUZ, TVFPRH, EMSZJC))
(0,CompactBuffer(ZCEXLX, BKSGQD, ICRWVA, PXFBAC, SUBCYR, OMEQVV, TMBPHW))
(2,CompactBuffer(XTAKJH, HOUFFR, KIJCNU, BDILZU, SJFGRN, IZPCHR, RIPRRA, UUGZER))
(1,CompactBuffer(TMWTYV, PYSAJV))
(3,CompactBuffer(UHQTWN, YSLXXE, PNIMWJ, NAYYWU, EYPRPM, SXGUQO, DDSNIY, EXPSPM))
(5,CompactBuffer(ZOGCRZ, VORGBM, CUZZFS, SLFBWC, PFRFRA))
# reduceByKey 结果
(4,HCAESV-OZNIQU-WIIWNX-MEFMUZ-TVFPRH-EMSZJC)
(0,ZCEXLX-BKSGQD-ICRWVA-PXFBAC-SUBCYR-OMEQVV-TMBPHW)
(1,TMWTYV-PYSAJV)
(3,UHQTWN-YSLXXE-PNIMWJ-NAYYWU-EYPRPM-SXGUQO-DDSNIY-EXPSPM)
(5,ZOGCRZ-VORGBM-CUZZFS-SLFBWC-PFRFRA)
(2,XTAKJH-HOUFFR-KIJCNU-BDILZU-SJFGRN-IZPCHR-RIPRRA-UUGZER)