思路
首先按照月份来分组,对组内的数据按照温度来排序
取温度最高的前两名,然后分组取RDD
代码
public class TopNTemperature {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("TopNTemperature").setMaster("local[1]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> weatherRdd = sc.textFile("weather");
weatherRdd = weatherRdd.cache();
JavaPairRDD<String, List<Double>> top2TemperaturePerMonthRDD = weatherRdd.mapToPair(new PairFunction<String, String,Double>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Double> call(String log) throws Exception {
String[] splited = log.split("\t");
String date = splited[0].substring(0, 7);
Double temperature = Double.parseDouble(splited[1].trim());
return new Tuple2<String, Double>(date,temperature);
}
})//生成(年月,温度)的RDD
.groupByKey()//按照月份分组
.mapToPair(new PairFunction<Tuple2<String,Iterable<Double>>, String,List<Double>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, List<Double>> call(Tuple2<String, Iterable<Double>> tuple) throws Exception {
String date = tuple._1;
Iterator<Double> iterator = tuple._2.iterator();
Double[] temperatures = new Double[2];
//list
while(iterator.hasNext()){
//遍历每一个月份
Double temperature = iterator.next();
for(int i = 0 ;i < temperatures.length ; i++){
//遍历每一个月份里的温度
if(temperatures[i] == null){
//判断某一个月份里是否只有一个温度,如果有不用排序了
temperatures[i] = temperature;
break;
}else{
if(temperature > temperatures[i]){
/**
* 有两个以上的温度,对Value值排序,取出最大的两个数,类似于冒泡法
* temperature[]这个数组只能装下两个元素多于的就被淘汰了
*/
for(int j = (temperatures.length-1) ; j > i ;j--){
temperatures[j] = temperatures[j-1];
}
temperatures[i] = temperature;
break;
}
}
}
}
//Collections.sort()
List<Double> asList = Arrays.asList(temperatures);
return new Tuple2<String, List<Double>>(date,asList);
}
});
List<Tuple2<String, List<Double>>> collect = top2TemperaturePerMonthRDD.collect();
for(Tuple2<String,List<Double>> t:collect){
//collect算子:Action算子,能够将每一个task的计算结果回收到Driver端,一般用于测试
String date = t._1;
List<Double> topN = t._2;
System.out.println("date:" + date + "\tTemperature:" + topN.get(0) + "\t" + topN.get(1));
}
sc.stop();
}
}
关于serialVersionUID
serialVersionUID适用于Java的序列化机制。简单来说,Java的序列化机制是通过判断类的serialVersionUID来验证版本一致性的。在进行反序列化时,JVM会把传来的字节流中的serialVersionUID与本地相应实体类的serialVersionUID进行比较,如果相同就认为是一致的,可以进行反序列化,否则就会出现序列化版本不一致的异常,即是InvalidCastException。