combineByKey 算子
函数功能:
聚合各分区的元素,而每个元素都是二元组。功能与基础RDD函数aggregate()差不多,可让用户返回与输入数据类型不同的返回值。
combineByKey函数的每个参数分别对应聚合操作的各个阶段。所以,理解此函数对Spark如何操作RDD会有很大帮助。
参数解析:
createCombiner:分区内 创建组合函数
mergeValue:分区内 合并值函数
mergeCombiners:多分区 合并组合器函数
partitioner:自定义分区数,默认为HashPartitioner
mapSideCombine:是否在map端进行Combine操作,默认为true
示例:
假如现有 男,李四 男,张三 女,韩梅梅 女,李思思 男,马云 这样五对数据,我想把它统计成
男,([李四, 张三,马云],3) 女,([韩梅梅, 李思思],2) 这样格式的数据,我们最简便的就是用combineByKey算子了,废话不多说 上代码。
import cn.hutool.core.collection.CollectionUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* @author cao kangle
* @Type CombineByKey.java
* @Desc 使用不同的返回类型合并具有相同键的值
* @date 2020/5/25 16:21
*/
public class CombineByKey {
public static void main(String[] args) {
SparkConf conf=new SparkConf().setMaster("local").setAppName("CombineByKey");
JavaSparkContext sparkContext=new JavaSparkContext(conf);
JavaRDD<Tuple2<String, String>> parallelize = sparkContext.parallelize(Arrays.asList(new Tuple2<>("男", "李四"), new Tuple2<>("男", "张三"), new Tuple2<>("女", "韩梅梅"), new Tuple2<>("女", "李思思"), new Tuple2<>("男", "马云")));
parallelize.foreach(new VoidFunction<Tuple2<String, String>>() {
@Override
public void call(Tuple2<String, String> stringStringTuple2) throws Exception {
System.out.println(stringStringTuple2._1+","+stringStringTuple2._2);
}
});
JavaPairRDD<String, String> pairRDD = parallelize.mapToPair(new PairFunction<Tuple2<String, String>, String, String>() {
@Override
public Tuple2<String, String> call(Tuple2<String, String> xx) throws Exception {
return new Tuple2<>(xx._1, xx._2);
}
});
JavaPairRDD<String, Tuple2<List<String>, Integer>> combineRDD = pairRDD.combineByKey(new Function<String, Tuple2<List<String>, Integer>>() {
//createCombiner:分区内 创建组合函数 每个分区内 遇到的第一个之前没有遇到过的元素 走这个方法
@Override
public Tuple2<List<String>, Integer> call(String s) throws Exception {
List<String> list = new ArrayList<>();
list.add(s);
return new Tuple2<>(list, 1);
}
}, new Function2<Tuple2<List<String>, Integer>, String, Tuple2<List<String>, Integer>>() {
//分区内 合并值函数
@Override
public Tuple2<List<String>, Integer> call(Tuple2<List<String>, Integer> listIntegerTuple2, String s) throws Exception {
List<String> list = listIntegerTuple2._1;
list.add(s);
int x = listIntegerTuple2._2 + 1;
return new Tuple2<>(list, x);
}
}, new Function2<Tuple2<List<String>, Integer>, Tuple2<List<String>, Integer>, Tuple2<List<String>, Integer>>() {
//多分区 合并组合器函数
@Override
public Tuple2<List<String>, Integer> call(Tuple2<List<String>, Integer> listIntegerTuple2, Tuple2<List<String>, Integer> listIntegerTuple22) throws Exception {
List<String> list = listIntegerTuple2._1;
list.addAll(listIntegerTuple22._1);
int x = listIntegerTuple2._2 +listIntegerTuple22._2;
return new Tuple2<>(list, x);
}
},
//自定义分区数,默认为HashPartitioner
2);
combineRDD.foreach(new VoidFunction<Tuple2<String, Tuple2<List<String>, Integer>>>() {
@Override
public void call(Tuple2<String, Tuple2<List<String>, Integer>> stringTuple2Tuple2) throws Exception {
System.out.println( stringTuple2Tuple2._1+","+stringTuple2Tuple2._2);
}
});
}
}