Spark高级编程之二次排序

原始数据:

2 6
3 7
1 5
2 4
3 6
1 3
2 1
3 1

1、Java版二次排序
首先定义排序的key

/**
 * 自定义的二次排序
 */
public class SecondarySortKey implements Ordered<SecondarySortKey>,Serializable{
   private static final long serialVersionUID = -236567544543677678L;

   //首先在自定义key里面,定义需要进行排序的列
    private int first;
    private int second;

    public SecondarySortKey(int first, int second) {
        this.first = first;
        this.second = second;
    }

    @Override
    public boolean $greater(SecondarySortKey that) {
        if(this.first>that.getFirst()){
            return true;
        }else if(this.first==that.getFirst() &&
                this.second >that.getSecond()){
            return true;
        }
        return false;
    }

    @Override
    public boolean $greater$eq(SecondarySortKey that) {
        if(this.$greater(that)){
            return true;
        }else if(this.first==that.getFirst() &&
                this.second==that.getSecond()){
            return true;
        }
        return false;
    }

    @Override
    public boolean $less(SecondarySortKey that) {
        if(this.first<that.getFirst()){
            return true;
        }else if(this.first==that.getFirst() &&
                this.second <that.getSecond()){
            return true;
        }
        return false;
    }

    @Override
    public boolean $less$eq(SecondarySortKey that) {
        if(this.$less(that)){
            return true;
        }else if(this.first==that.getFirst() &&
                this.second== that.getSecond()){
            return true;
        }
        return false;
    }

    @Override
    public int compare(SecondarySortKey that) {
        if(this.first - that.getFirst() !=0){
            return this.first - that.getFirst();
        }else{
            return this.second - that.getSecond();
        }
    }



    @Override
    public int compareTo(SecondarySortKey that) {
        if(this.first - that.getFirst() !=0){
            return this.first - that.getFirst();
        }else{
            return this.second - that.getSecond();
        }
    }

    public int getFirst() {
        return first;
    }

    public void setFirst(int first) {
        this.first = first;
    }

    public int getSecond() {
        return second;
    }

    public void setSecond(int second) {
        this.second = second;
    }
}

然后实现二次排序

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

/**
 * 二次排序
 */
public class SecondarySort {

    public static void main(String[] args) {
        String inputPath ="D:\\spark\\sortnumber.txt";
        long beginTime = System.currentTimeMillis();
        SparkConf conf = new SparkConf()
                .setAppName("SecondarySort")//应用程序的名称
                .setMaster("local");//本地运行
        //2、创建JavaSparkContext对象
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile(inputPath);
        JavaPairRDD<SecondarySortKey, String> pairRDD = lines.mapToPair(new PairFunction<String, SecondarySortKey, String>() {
            @Override
            public Tuple2<SecondarySortKey, String> call(String line) throws Exception {
                String[] split = line.split(" ");
                SecondarySortKey key = new SecondarySortKey(Integer.valueOf(split[0]), Integer.valueOf(split[1]));
                return new Tuple2<SecondarySortKey, String>(key, line);
            }
        });

        JavaPairRDD<SecondarySortKey, String> sortedPairs = pairRDD.sortByKey();
        JavaRDD<String> sortedLines = sortedPairs.map(new Function<Tuple2<SecondarySortKey, String>, String>() {
            @Override
            public String call(Tuple2<SecondarySortKey, String> v1) throws Exception {
                return v1._2;
            }
        });
        sortedLines.foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });
        sc.close();
    }
}

2、Scala版二次排序
首先定义排序的key

class SecondarySortKey(val first:Int,val second:Int) extends Ordered[SecondarySortKey] with Serializable {
   def compare(that: SecondarySortKey): Int = {
     if(this.first- that.first !=0 ){
       this.first - that.first
     }else{
       this.second -that.second
     }
  }
}

然后实现二次排序

import org.apache.spark.{SparkConf, SparkContext}

object SeconDarySort {

  def main(args: Array[String]): Unit = {
    val inputPath = "D:\\spark\\sortnumber.txt"
    val  conf = new SparkConf()
      .setAppName("SeconDarySort")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val lines =sc.textFile(inputPath,1)
    val pairs = lines.map(line =>
      (new SecondarySortKey(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line))
    val sortedPairs = pairs.sortByKey()
    val sortedList = sortedPairs.map(_._2)
    sortedList.foreach(println)
  }
}

排序结果一样:

1 3
1 5
2 1
2 4

2 6
3 1
3 6
3 7

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 1.插入排序—直接插入排序(Straight Insertion Sort) 基本思想: 将一个记录插入到已排序好...
    依依玖玥阅读 1,285评论 0 2
  • 排序的基本概念 在计算机程序开发过程中,经常需要一组数据元素(或记录)按某个关键字进行排序,排序完成的序列可用于快...
    Jack921阅读 1,461评论 1 4
  • 没有驶向远方的轮船 就用躯体做前行的木筏 双桨长出知更鸟的翅膀 河流铺平鲸鱼的道路 胸膛里的一千只火把 在没有星光...
    黄英雄阅读 257评论 0 3
  • 荣故事:钻石戒指 结婚的时候,老公带我到银座买了六福的首饰,选中了一颗钻石。他选的我不喜欢,我又选了一颗大一点的钻...
    寇荣2020阅读 187评论 0 0
  • 没有宝宝喜欢一生气就开始大吼大叫的爸妈,有研究表明父母的情绪越是稳定孩子越具有幸福感和安全感。反之,则是对孩子的伤...
    孙老师说阅读 452评论 0 0