Spark RDD算子练习题--java版

首先明确:本文原创于:https://blog.csdn.net/qq_40825218/article/details/83720732

话不多说,上数据:

12  张三  25  男   chinese 50
12  张三  25  男   math    60
12  张三  25  男   english 70
12  李四  20  男   chinese 50
12  李四  20  男   math    50
12  李四  20  男   english 50
12  王芳  19  女   chinese 70
12  王芳  19  女   math    70
12  王芳  19  女   english 70
13  张大三 25  男   chinese 60
13  张大三 25  男   math    90
13  张大三 25  男   english 70
13  李大四 24  男   chinese 50
13  李大四 20  男   math    80
13  李大四 20  男   english 50
13  王小芳 19  女   chinese 70
13  王小芳 19  女   math    80
13  王小芳 19  女   english 70

将数据保存在xxx/score.data
需要用到的对象:

package com.hjb.pojo;
/**
 * @DESC sparkTest 使用对象
 * @date 2020年5月24日 下午1:51:01
 * @author HJB
 */
public class ScorePOJO {
    private String classId;//班级编号
    private String name;//姓名
    private int age;//年龄
    private String sex;//性别
    private String course;//课程
    private double score;//分数
    public ScorePOJO() {
        super();
        // TODO Auto-generated constructor stub
    }
    public ScorePOJO(String classId, String name, int age, String sex, String course, double score) {
        super();
        this.classId = classId;
        this.name = name;
        this.age = age;
        this.sex = sex;
        this.course = course;
        this.score = score;
    }
    public String getClassId() {
        return classId;
    }
    public void setClassId(String classId) {
        this.classId = classId;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getAge() {
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }
    public String getSex() {
        return sex;
    }
    public void setSex(String sex) {
        this.sex = sex;
    }
    public String getCourse() {
        return course;
    }
    public void setCourse(String course) {
        this.course = course;
    }
    public double getScore() {
        return score;
    }
    public void setScore(double score) {
        this.score = score;
    }
    @Override
    public String toString() {
        return "ScorePOJO [classId=" + classId + ", name=" + name + ", age=" + age + ", sex=" + sex + ", course="
                + course + ", score=" + score + "]";
    }
}

下面是代码部分,比较简单

package com.hjb.start;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import com.hjb.pojo.ScorePOJO;
import com.hjb.tools.HadoopHomeUtil;
/**
 * @DESC spark study
 * @date 2020年5月24日 下午1:42:55
 * @author HJB
 */
public class SparkTest1 {
    public static void main(String[] args) {
        //HadoopHomeUtil是spark参数设置,根据自己的情况设置即可
        SparkConf sparkConf = HadoopHomeUtil.initSparkProperties("spark test");
        // 在这里用于加载数据为rdd对象
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        SparkSession sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate();
        JavaRDD<String> rdd = jsc.textFile("C:\\Users\\Administrator\\Desktop\\spark-test-data\\score.data");
        JavaRDD<ScorePOJO> rdd1 = rdd.map(new Function<String, ScorePOJO>() {
            private static final long serialVersionUID = 1L;
            @Override
            public ScorePOJO call(String s) throws Exception {
                String[] split = s.split("\t");
                ScorePOJO pojo = new ScorePOJO();
                pojo.setClassId(split[0]);
                pojo.setName(split[1]);
                pojo.setAge(Integer.valueOf(split[2]));
                pojo.setSex(split[3]);
                pojo.setCourse(split[4]);
                pojo.setScore(Double.valueOf(split[5]));
                return pojo;
            }
        });
        Dataset<Row> dataset = sparkSession.createDataFrame(rdd1, ScorePOJO.class);
        // 1一共有多少个小于20岁的人参加考试?
        long xiaoYu20 = dataset.where("age < 20").count();
        // 2一共有多少个等于20岁的人参加考试?
        long dengYu20 = dataset.where("age = 20").count();
        // 3一共有多少个大于20岁的人参加考试?
        long daYu20 = dataset.where("age > 20").count();
        // 4一共有多个男生参加考试?
        long nanSheng = dataset.where("sex='男'").count();
        // 5一共有多少个女生参加考试?
        long nvSheng = dataset.where("sex='女'").count();
        // 612班有多少人参加考试?
        long class12 = dataset.where("classId='12'").count();
        // 713班有多少人参加考试?
        long class13 = dataset.where("classId=13").count();
        // 8语文科目的平均成绩是多少?
        List<Row> chineseAvg = dataset.where("course='chinese'").agg(functions.avg("score")).collectAsList();
        // 9 数学科目的平均成绩是多少?
        List<Row> mathAvg = dataset.where("course='math'").agg(functions.avg("score")).collectAsList();
        // 10英语科目的平均成绩是多少?
        List<Row> englishAvg = dataset.where("course='english'").agg(functions.avg("score")).collectAsList();
        // 11单个人平均成绩是多少?
        List<Row> eachAvg = dataset.groupBy("name").agg(functions.avg("score").alias("each_avg")).collectAsList();
        // 12.12班平均成绩是多少?
        List<Row> avg12 = dataset.where("classId='12'").agg(functions.avg("score").alias("avg_12")).collectAsList();
        // 13.12班男生平均总成绩是多少?
        List<Row> avg12m = dataset.where("classId='12' and sex='男'").agg(functions.avg("score").alias("avg_12_m")).collectAsList();
        // 14.12班女生平均总成绩是多少?
        List<Row> avg12f = dataset.where("classId='12' and sex='女'").agg(functions.avg("score").alias("avg_12_f")).collectAsList();
        // 15.13班平均成绩是多少?
        List<Row> avg13 = dataset.where("classId='13'").agg(functions.avg("score").alias("avg_13_f")).collectAsList();
        // 16.13班男生平均总成绩是多少?
        List<Row> avg13m = dataset.where("classId='13' and sex='男'").agg(functions.avg("score").alias("avg_13_m")).collectAsList();
        // 17.13班女生平均总成绩是多少?
        List<Row> avg13f = dataset.where("classId=13 and sex='女'").agg(functions.avg("score").alias("avg_13_f")).collectAsList();
        // 18全校语文成绩最高分是多少?
        List<Row> maxChinese = dataset.where("course='chinese'").agg(functions.max("score")).collectAsList();
        // 19.12班语文成绩最低分是多少?
        List<Row> min12 = dataset.where("classId=12 and sex='男'").agg(functions.min("score")).collectAsList();
        // 20.13班数学最高成绩是多少?
        List<Row> max13Math = dataset.where("classId=13 and course='math'").agg(functions.max("score")).collectAsList();
        // 21.总成绩大于150分的12班的女生有几个?
        long daYu150f13 = dataset.where("classId=13 and sex='女'").agg(functions.sum("score").alias("daYu150")).where("daYu150>150").count();
        // 22.总成绩大于150分,且数学大于等于70,且年龄大于等于19岁的学生的平均成绩是多少?
        List<Row> result1 = dataset.groupBy("name", "age").agg(functions.sum("score").alias("sumScore"), functions.avg("score").alias("avgScore")).where("sumScore > 150 and age>19").collectAsList();
        List<Row> result2 = dataset.select("name").where("course='math' and score>=70").collectAsList();
        
        Set<String> names = new HashSet<String>();
        for (Row row: result2) {
            names.add(row.getString(0));
        }
        System.out.println("result1: " + result1);
        System.out.println("result2: " + result2);
        System.out.println("names: " + names);
        List<Map<String,Object>> result = new ArrayList<>();
        for (Row row: result1) {
            if (names.contains(row.getString(0))) {
                Map<String, Object> map = new HashMap<String, Object>();
                map.put("name", row.getString(0));
                map.put("age", row.getInt(1));
                map.put("sumScore", row.get(2));
                map.put("avgScore", row.get(3));
                result.add(map);
            }
        }
        System.out.println("xiaoYu20: " + xiaoYu20);
        System.out.println("dengYu20: " + dengYu20);
        System.out.println("daYu20: " + daYu20);
        System.out.println("nanSheng: " + nanSheng);
        System.out.println("nvSheng: " + nvSheng);
        System.out.println("class12: " + class12);
        System.out.println("class13: " + class13);
        System.out.println("chineseAvg: " + chineseAvg);
        System.out.println("mathAvg: " + mathAvg);
        System.out.println("englishAvg: " + englishAvg);
        System.out.println("eachAvg: " + eachAvg);
        System.out.println("avg12: " + avg12);
        System.out.println("avg12m: " + avg12m);
        System.out.println("avg12f: " + avg12f);
        System.out.println("avg13: " + avg13);
        System.out.println("avg13m: " + avg13m);
        System.out.println("avg13f: " + avg13f);
        System.out.println("maxChinese: " + maxChinese);
        System.out.println("min12: " + min12);
        System.out.println("max13Math: " + max13Math);
        System.out.println("daYu150f13: " + daYu150f13);
        System.out.println("result: " + result);
        jsc.close();
    }
}

附上执行的结果:

result1: [[张大三,25,220.0,73.33333333333333], [张三,25,180.0,60.0]]
result2: [[王芳], [张大三], [李大四], [王小芳]]
names: [张大三, 王小芳, 王芳, 李大四]
xiaoYu20: 6
dengYu20: 5
daYu20: 7
nanSheng: 12
nvSheng: 6
class12: 9
class13: 9
chineseAvg: [[58.333333333333336]]
mathAvg: [[71.66666666666667]]
englishAvg: [[63.333333333333336]]
eachAvg: [[李大四,60.0], [张大三,73.33333333333333], [李四,50.0], [王小芳,73.33333333333333], [张三,60.0], [王芳,70.0]]
avg12: [[60.0]]
avg12m: [[55.0]]
avg12f: [[70.0]]
avg13: [[68.88888888888889]]
avg13m: [[66.66666666666667]]
avg13f: [[73.33333333333333]]
maxChinese: [[70.0]]
min12: [[50.0]]
max13Math: [[90.0]]
daYu150f13: 1
result: [{avgScore=73.33333333333333, sumScore=220.0, name=张大三, age=25}]
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。