什么是二次排序
- 二次排序就是key之间有序,而且每个Key对应的value也是有序的;也就是对MapReduce的输出(KEY, Value(v1,v2,v3,......,vn))中的Value(v1,v2,v3,......,vn)值进行排序(升序或者降序),使得Value(s1,s2,s3,......,sn),si ∈ (v1,v2,v3,......,vn)且s1 < s2 < s3 < ...... < sn。假设我们有以下输入文件(逗号分割的分别是年,月,总数):
[root@iteblog.com /tmp]# vim data.txt
2015,1,24
2015,3,56
2015,1,3
2015,2,-43
2015,4,5
2015,3,46
2014,2,64
2015,1,4
2015,1,21
2015,2,35
2015,2,0
我们期望的输出结果是
2014-2 64
2015-1 3,4,21,24
2015-2 -43,0,35
2015-3 46,56
2015-4 5
spark 二次排序解决方案
我们只需要将年和月组合起来构成一个Key,将第三列作为value,并使用 groupByKey 函数将同一个Key的所有Value全部弄到一起,然后对同一个Key的所有Value进行排序即可。
scala 版实现过程,分为遍历输出和转成df格式,可进行下一步执行,重点理解groupByKey()算子和scala函数编程的思想。
package cn.ted.secondarySort
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
/*
* Author: LiYahui
* Date: Created in 2019/3/1 11:21
* Description: TODO spark实现二次排序,key有序,value内部的数据同样有序
* Version: V1.0
*/
object SecondarySort {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName(s"${this.getClass.getSimpleName}")
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.parquet.compression.codec", "gzip")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
val inputPath = "F:\\LocalFileForTest\\secondarySort"
//将结果进行打印
sc.textFile(inputPath)
.map(line => {
val arr: Array[String] = line.split(",")
//对年份和月份进行拼接
val key: String = arr(0) + "-" + arr(1)
val value = arr(2)
//拼接成kv类型
(key, value)
})
.groupByKey()
.map(line => (line._1, line._2.toList.sortWith(_.toInt < _.toInt).mkString(","))) //value内部进行升序排列
.sortByKey(true) //key升序排列
.collect()
.foreach(println)
import spark.implicits._
//转换成df格式进行计算
sc.textFile(inputPath)
.map(line => {
val arr: Array[String] = line.split(",")
//对年份和月份进行拼接
val key: String = arr(0) + "-" + arr(1)
val value = arr(2)
//拼接成kv类型
(key, value)
})
.groupByKey()
.map(line => (line._1, line._2.toList.sortWith(_.toInt < _.toInt).mkString(","))) //value内部进行升序排列
.sortByKey(true) //key升序
.toDF("key", "value")
spark.stop()
sc.stop()
}
/**
* 数据源和期望结果
* [root@iteblog.com /tmp]# vim data.txt
* 2015,1,24
* 2015,3,56
* 2015,1,3
* 2015,2,-43
* 2015,4,5
* 2015,3,46
* 2014,2,64
* 2015,1,4
* 2015,1,21
* 2015,2,35
* 2015,2,0
* 我们期望的输出结果是
*
* 2014-2 64
* 2015-1 3,4,21,24
* 2015-2 -43,0,35
* 2015-3 46,56
* 2015-4 5
*
*/
}