import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
object Spark03 {
//二次排序
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[2]").appName("03").getOrCreate()
import spark.implicits._
Logger.getRootLogger.setLevel(Level.WARN)
//sparkcore 计算方式
fun1(spark.sparkContext)
//sparksql 计算方式
fun2(spark)
spark.stop()
}
//使用sparkcore实现
/*
sparkcore实现思路:
第一步: 自定义 key 实现scala.math.Ordered接口和Serializeable接口
第二步:将要进行二次排序的数据加载, 将数据映射为 <key,value> 格式的RDD
第三步:使用sortByKey 基于自定义的key进行二次排序
第四步:去掉排序的key,只保留排序的结果
*/
def fun1( sc:SparkContext ):Unit ={
val data = List("hadoop spark scala spark", "scala mr", "spark scala mr java" )
sc.parallelize(data, 3)
// 正则表达式 \\s 表示空白字符
.flatMap( _.split("\\s+") )
.map( (_,1) )
.reduceByKey( _+_ )
.map( bean =>{
( MySortKey(bean._1, bean._2), bean )
})
.sortByKey(false,1)
.map(_._2)
.foreach( println )
}
//使用sparksql实现
/**
使用sql完成二次排序
select * from tablename order by field1, field2 desc
*/
def fun2( spark:SparkSession ):Unit ={
import spark.implicits._
val data = List("hadoop spark scala spark", "scala mr", "spark scala mr java" )
spark.sparkContext.parallelize(data,3)
.flatMap( _.split("\\s+") )
.map( (_,1) )
.reduceByKey( _+_ )
.toDF("word","count")
.createOrReplaceTempView("temptable")
val sql = s"select word,count from temptable order by count desc,word "
spark.sql(sql).show(false)
}
//Ordered[自定义数据类型] 用来自定义排序规则
case class MySortKey(first:String, second:Int) extends Ordered[MySortKey]{
//重写比较方法
// 首先 按照second字段 数字大小降序, 之后按照 first字段 字典升序
override def compare(that: MySortKey): Int = {
val temp = this.second - that.second
if ( temp != 0 ){
temp
}else{
- this.first.compareTo(that.first)
}
}
}
}
spark二次排序案例
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- 最近在项目中遇到二次排序的需求,和平常开发spark的application一样,开始查看API,编码,调试,验证...