原始数据:
2 6
3 7
1 5
2 4
3 6
1 3
2 1
3 1
1、Java版二次排序
首先定义排序的key
/**
* 自定义的二次排序
*/
public class SecondarySortKey implements Ordered<SecondarySortKey>,Serializable{
private static final long serialVersionUID = -236567544543677678L;
//首先在自定义key里面,定义需要进行排序的列
private int first;
private int second;
public SecondarySortKey(int first, int second) {
this.first = first;
this.second = second;
}
@Override
public boolean $greater(SecondarySortKey that) {
if(this.first>that.getFirst()){
return true;
}else if(this.first==that.getFirst() &&
this.second >that.getSecond()){
return true;
}
return false;
}
@Override
public boolean $greater$eq(SecondarySortKey that) {
if(this.$greater(that)){
return true;
}else if(this.first==that.getFirst() &&
this.second==that.getSecond()){
return true;
}
return false;
}
@Override
public boolean $less(SecondarySortKey that) {
if(this.first<that.getFirst()){
return true;
}else if(this.first==that.getFirst() &&
this.second <that.getSecond()){
return true;
}
return false;
}
@Override
public boolean $less$eq(SecondarySortKey that) {
if(this.$less(that)){
return true;
}else if(this.first==that.getFirst() &&
this.second== that.getSecond()){
return true;
}
return false;
}
@Override
public int compare(SecondarySortKey that) {
if(this.first - that.getFirst() !=0){
return this.first - that.getFirst();
}else{
return this.second - that.getSecond();
}
}
@Override
public int compareTo(SecondarySortKey that) {
if(this.first - that.getFirst() !=0){
return this.first - that.getFirst();
}else{
return this.second - that.getSecond();
}
}
public int getFirst() {
return first;
}
public void setFirst(int first) {
this.first = first;
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}
}
然后实现二次排序
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
/**
* 二次排序
*/
public class SecondarySort {
public static void main(String[] args) {
String inputPath ="D:\\spark\\sortnumber.txt";
long beginTime = System.currentTimeMillis();
SparkConf conf = new SparkConf()
.setAppName("SecondarySort")//应用程序的名称
.setMaster("local");//本地运行
//2、创建JavaSparkContext对象
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile(inputPath);
JavaPairRDD<SecondarySortKey, String> pairRDD = lines.mapToPair(new PairFunction<String, SecondarySortKey, String>() {
@Override
public Tuple2<SecondarySortKey, String> call(String line) throws Exception {
String[] split = line.split(" ");
SecondarySortKey key = new SecondarySortKey(Integer.valueOf(split[0]), Integer.valueOf(split[1]));
return new Tuple2<SecondarySortKey, String>(key, line);
}
});
JavaPairRDD<SecondarySortKey, String> sortedPairs = pairRDD.sortByKey();
JavaRDD<String> sortedLines = sortedPairs.map(new Function<Tuple2<SecondarySortKey, String>, String>() {
@Override
public String call(Tuple2<SecondarySortKey, String> v1) throws Exception {
return v1._2;
}
});
sortedLines.foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.println(s);
}
});
sc.close();
}
}
2、Scala版二次排序
首先定义排序的key
class SecondarySortKey(val first:Int,val second:Int) extends Ordered[SecondarySortKey] with Serializable {
def compare(that: SecondarySortKey): Int = {
if(this.first- that.first !=0 ){
this.first - that.first
}else{
this.second -that.second
}
}
}
然后实现二次排序
import org.apache.spark.{SparkConf, SparkContext}
object SeconDarySort {
def main(args: Array[String]): Unit = {
val inputPath = "D:\\spark\\sortnumber.txt"
val conf = new SparkConf()
.setAppName("SeconDarySort")
.setMaster("local")
val sc = new SparkContext(conf)
val lines =sc.textFile(inputPath,1)
val pairs = lines.map(line =>
(new SecondarySortKey(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line))
val sortedPairs = pairs.sortByKey()
val sortedList = sortedPairs.map(_._2)
sortedList.foreach(println)
}
}
排序结果一样:
1 3
1 5
2 1
2 4
2 6
3 1
3 6
3 7