Spark高级编程之TopN及分组TopN

1、获取文本内最大的前三个数字
输入数据:

3
54
44
2
67
32
133
54
23
1
35
23
73
32
16
78
21
56

1)Java版top3:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;

/**
 * 取最大的前3个数字
 */
public class Topn {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("Topn")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile("D:\\spark\\top3.txt");
        JavaPairRDD<Integer, String> pairs = lines.mapToPair(new PairFunction<String, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(String s) throws Exception {
                return new Tuple2<>(Integer.valueOf(s),s);
            }
        });
        JavaPairRDD<Integer, String> sortedPairs = pairs.sortByKey(false);
        JavaRDD<Integer> sortedNumbers = sortedPairs.map(new Function<Tuple2<Integer, String>, Integer>() {
            @Override
            public Integer call(Tuple2<Integer, String> v1) throws Exception {
                return v1._1;
            }
        });
        List<Integer> list = sortedNumbers.take(3);
        System.out.println(Arrays.toString(list.toArray()));

        sc.close();

    }
}

输出结果:

[133, 78, 73]

2)Scala版top3

import org.apache.spark.{SparkConf, SparkContext}

object Topn {

  def main(args: Array[String]): Unit = {
    val inputPath = "D:\\spark\\top3.txt"
    val  conf = new SparkConf()
      .setAppName("Topn")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val lines =sc.textFile(inputPath,1)
    val pairs = lines.map(line =>(line.toInt,line))
    val sortedPairs = pairs.sortByKey(false)
    val sortedNumbers = sortedPairs.map(_._1)
    val topNumber = sortedNumbers.take(3)
    for (num <- topNumber){
      println(num)
    }
    sc.stop()
  }

}

输出结果:

133
78
73

2、对每个班级的学生成绩,取出前3名(分组取topn)
输入数据:

class1 90
class2 56
class1 87
class1 76
class2 88
class1 95
class1 74
class2 87
class2 67
class2 77

1)Java版分组取topn

/**
 * 分组取Top3
 */
public class GroupTop3 {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("Topn")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile("D:\\spark\\grouptop3.txt");
        JavaPairRDD<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                String[] split = s.split(" ");
                return new Tuple2<>(split[0], Integer.valueOf(split[1]));
            }
        });

        JavaPairRDD<String, Iterable<Integer>> groupedPairs = pairs.groupByKey();
        JavaPairRDD<String, Iterable<Integer>> top3Scores = groupedPairs.mapToPair(new PairFunction<Tuple2<String, Iterable<Integer>>, String, Iterable<Integer>>() {
            @Override
            public Tuple2<String, Iterable<Integer>> call(Tuple2<String, Iterable<Integer>> classScores) throws Exception {
                Integer[] top3 = new Integer[3];
                String className = classScores._1;
                Iterator<Integer> scores = classScores._2.iterator();
                while (scores.hasNext()){
                    Integer score = scores.next();
                    for(int i=0;i<3;i++){
                        if(top3[i]==null){
                            top3[i] = score;
                            break;
                        }else if(score>top3[i]){
                            for(int j=top3.length-1;j>i;j--){
                                top3[j]=top3[j-1];
                            }
                            top3[i] = score;
                            break;
                        }
                    }
                }
                return new Tuple2<String, Iterable<Integer>>(className,Arrays.asList(top3));
            }
        });

        top3Scores.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
            @Override
            public void call(Tuple2<String, Iterable<Integer>> top3Score) throws Exception {
                System.out.println("class:"+top3Score._1);
                Iterator<Integer> integers = top3Score._2.iterator();
                while (integers.hasNext()){
                    System.out.println(integers.next());
                }
                System.out.println("=================================");

            }
        });
        sc.close();
    }
}

输出结果:

class:class1
95
90
87
=================================
class:class2
88
87
77
=================================

2)scala版分组取topn

object GroupTop3 {

  def main(args: Array[String]): Unit = {
    val inputPath = "D:\\spark\\grouptop3.txt"
    val  conf = new SparkConf()
      .setAppName("GroupTop3")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val lines =sc.textFile(inputPath,1)
    val pairs = lines.map(line =>(line.split(" ")(0),line.split(" ")(1).toInt))
    val groupedPairs = pairs.groupByKey()
    val groupscores = groupedPairs.map(classScore =>{
      val className = classScore._1;
      val scores = classScore._2.iterator
      val top3 = new Array[Integer](3)
      var flag = true
      for (score <- scores){
        var flag = true
        for (i <- 0 until top3.length if flag){
          if(top3(i)==null){
            top3(i) = score
            flag = false
          }else if(score>top3(i)){
            for(j <- top3.length - 1 to i+1 by -1){
              top3(j) = top3(j-1);
            }
            top3(i) = score;
            flag = false
          }
        }
      }
      (className,top3)
    })

    for(groupscore<-groupscores){
      println("class:"+groupscore._1)
      val scores = groupscore._2
      for(score<- scores){
        println(score)
      }
      println("=============================")
    }

    sc.stop()
  }

}

输出结果:

class:class1
95
90
87
=============================
class:class2
88
87
77
=============================

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容