spark combineByKey 示例(java)

                                                combineByKey 算子

函数功能:

聚合各分区的元素,而每个元素都是二元组。功能与基础RDD函数aggregate()差不多,可让用户返回与输入数据类型不同的返回值。

combineByKey函数的每个参数分别对应聚合操作的各个阶段。所以,理解此函数对Spark如何操作RDD会有很大帮助。

参数解析:

createCombiner:分区内 创建组合函数

mergeValue:分区内 合并值函数

mergeCombiners:多分区 合并组合器函数

partitioner:自定义分区数,默认为HashPartitioner

mapSideCombine:是否在map端进行Combine操作,默认为true

示例:

假如现有 男,李四 男,张三 女,韩梅梅 女,李思思 男,马云 这样五对数据,我想把它统计成

男,([李四, 张三,马云],3) 女,([韩梅梅, 李思思],2) 这样格式的数据,我们最简便的就是用combineByKey算子了,废话不多说 上代码。


import cn.hutool.core.collection.CollectionUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 * @author cao kangle
 * @Type CombineByKey.java
 * @Desc  使用不同的返回类型合并具有相同键的值
 * @date 2020/5/25 16:21
 */
public class CombineByKey {

    public static void main(String[] args) {
        SparkConf conf=new SparkConf().setMaster("local").setAppName("CombineByKey");

        JavaSparkContext sparkContext=new JavaSparkContext(conf);

        JavaRDD<Tuple2<String, String>> parallelize = sparkContext.parallelize(Arrays.asList(new Tuple2<>("男", "李四"), new Tuple2<>("男", "张三"), new Tuple2<>("女", "韩梅梅"), new Tuple2<>("女", "李思思"), new Tuple2<>("男", "马云")));

        parallelize.foreach(new VoidFunction<Tuple2<String, String>>() {
            @Override
            public void call(Tuple2<String, String> stringStringTuple2) throws Exception {
                System.out.println(stringStringTuple2._1+","+stringStringTuple2._2);
            }
        });


        JavaPairRDD<String, String> pairRDD = parallelize.mapToPair(new PairFunction<Tuple2<String, String>, String, String>() {
            @Override
            public Tuple2<String, String> call(Tuple2<String, String> xx) throws Exception {
                return new Tuple2<>(xx._1, xx._2);
            }
        });

        JavaPairRDD<String, Tuple2<List<String>, Integer>> combineRDD = pairRDD.combineByKey(new Function<String, Tuple2<List<String>, Integer>>() {
            //createCombiner:分区内 创建组合函数 每个分区内 遇到的第一个之前没有遇到过的元素 走这个方法
            @Override
            public Tuple2<List<String>, Integer> call(String s) throws Exception {
                List<String> list = new ArrayList<>();
                list.add(s);

                return new Tuple2<>(list, 1);
            }
        }, new Function2<Tuple2<List<String>, Integer>, String, Tuple2<List<String>, Integer>>() {
            //分区内 合并值函数
            @Override
            public Tuple2<List<String>, Integer> call(Tuple2<List<String>, Integer> listIntegerTuple2, String s) throws Exception {
                List<String> list = listIntegerTuple2._1;
                list.add(s);
                int x = listIntegerTuple2._2 + 1;

                return new Tuple2<>(list, x);
            }
        }, new Function2<Tuple2<List<String>, Integer>, Tuple2<List<String>, Integer>, Tuple2<List<String>, Integer>>() {
            //多分区 合并组合器函数
            @Override
            public Tuple2<List<String>, Integer> call(Tuple2<List<String>, Integer> listIntegerTuple2, Tuple2<List<String>, Integer> listIntegerTuple22) throws Exception {
                List<String> list = listIntegerTuple2._1;
                list.addAll(listIntegerTuple22._1);
                int x = listIntegerTuple2._2 +listIntegerTuple22._2;
                return new Tuple2<>(list, x);
            }
        },
                //自定义分区数,默认为HashPartitioner
                2);

        combineRDD.foreach(new VoidFunction<Tuple2<String, Tuple2<List<String>, Integer>>>() {
            @Override
            public void call(Tuple2<String, Tuple2<List<String>, Integer>> stringTuple2Tuple2) throws Exception {
                System.out.println( stringTuple2Tuple2._1+","+stringTuple2Tuple2._2);
            }
        });


    }
}  
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。