RDD常用算子列表

1.常用的Transformation算子

类型 功能描述
map 将RDD中的每个元素传入自定义函数,获取一个新的元素,然后用新的元素组成新的RDD
filter 对RDD中每个元素进行判断,如果返回true则保留,返回false则剔除
flatMap 与map类似,但是对每个元素都可以返回一个或多个新元素
groupByKey 根据key进行分组,每个key对应一个Iterable<value>
reduceByKey 对相同的key对应的多个value进行reduce操作
sortByKey 对每个元素按照key进行排序,默认升序,传入false则降序,还可以传入自定义的比较器
join 对两个包含<key,value>对的RDD进行join操作,类似于mysql中的表关联,返回<key,(value1,value2)>
cogroup 同join,但是每个key对应的元祖中是两个Iterable类型,即<key,(Iterable<value1>,Iterable<value2>)>

2.实战演练

Java版本

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.*;

public class TransformationStudy {

    public static void main(String[] args) {
        //map();
        //filter();
        //flatMap();
        //groupByKey();
        //reduceByKey();
        //sortByKey();
        //join();
        cogroup();
    }

    private static void map(){
        SparkConf conf = new SparkConf()
                .setAppName("map")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.setLogLevel("warn");
        JavaRDD<Integer> numberRDD = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));
        JavaRDD<Integer> multiNumberRDD = numberRDD.map(new Function<Integer, Integer>() {
            @Override
            public Integer call(Integer integer) throws Exception {
                return integer * 2;
            }
        });
        multiNumberRDD.foreach(new VoidFunction<Integer>() {
            @Override
            public void call(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });
        sc.close();
    }


    private static void filter(){
        SparkConf conf = new SparkConf()
                .setAppName("filter")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        sc.setLogLevel("warn");

        JavaRDD<Integer> numberRDD = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));

        JavaRDD<Integer> evenNumberRDD = numberRDD.filter(new Function<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) throws Exception {
                return (integer & 1) == 0;
            }
        });

        evenNumberRDD.foreach(new VoidFunction<Integer>() {
            @Override
            public void call(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });

        sc.close();
    }


    private static void flatMap(){
        SparkConf conf = new SparkConf()
                .setAppName("flatMap")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        sc.setLogLevel("warn");

        List<String> list = Arrays.asList("hello you", "hello me", "hello world");

        JavaRDD<String> lines = sc.parallelize(list);

        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                String[] split = s.split(" ");
                List<String> ls = new ArrayList<>(split.length);
                Collections.addAll(ls, split);
                return ls.iterator();
            }
        });
        words.foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });

        sc.close();
    }

    private static void groupByKey(){
        SparkConf conf = new SparkConf()
                .setAppName("groupByKey")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        sc.setLogLevel("warn");

        List<Tuple2<String,Integer>> list = Arrays.asList(
                new Tuple2<>("class1",90),
                new Tuple2<>("class2",80),
                new Tuple2<>("class1",75),
                new Tuple2<>("class2",65)
        );

        JavaPairRDD<String,Integer> scoresRDD = sc.parallelizePairs(list);

        JavaPairRDD<String,Iterable<Integer>> groupedScoresRDD = scoresRDD.groupByKey();


        groupedScoresRDD.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
            @Override
            public void call(Tuple2<String, Iterable<Integer>> stringIterableTuple2) throws Exception {
                System.out.println("class:" + stringIterableTuple2._1);
                for (Integer a_2 : stringIterableTuple2._2) {
                    System.out.println(a_2);
                }
                System.out.println("================================");
            }
        });

        sc.close();
    }


    private static void reduceByKey(){
        SparkConf conf = new SparkConf()
                .setAppName("reduceByKey")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        sc.setLogLevel("warn");

        List<Tuple2<String,Integer>> list = Arrays.asList(
                new Tuple2<>("class1",90),
                new Tuple2<>("class2",80),
                new Tuple2<>("class1",75),
                new Tuple2<>("class2",65)
        );

        JavaPairRDD<String,Tuple2<Integer,Integer>> scoresRDD = sc.parallelizePairs(list).mapValues(new Function<Integer, Tuple2<Integer,Integer>>() {
            @Override
            public Tuple2<Integer,Integer> call(Integer integer) throws Exception {
                return new Tuple2<Integer,Integer>(integer,1);
            }
        });

        JavaPairRDD<String,Tuple2<Integer,Integer>> reducedScoresRDD = scoresRDD.reduceByKey(new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> integerIntegerTuple2, Tuple2<Integer, Integer> integerIntegerTuple22) throws Exception {
                return new Tuple2<>(integerIntegerTuple2._1+integerIntegerTuple22._1 , integerIntegerTuple2._2+integerIntegerTuple2._2);
            }
        });

        reducedScoresRDD.foreach(new VoidFunction<Tuple2<String, Tuple2<Integer, Integer>>>() {
            @Override
            public void call(Tuple2<String, Tuple2<Integer, Integer>> stringTuple2Tuple2) throws Exception {
                System.out.println(stringTuple2Tuple2._1 + ":" + stringTuple2Tuple2._2._1.doubleValue() / stringTuple2Tuple2._2._2);
            }
        });

        sc.close();
    }


    private static void sortByKey(){
        SparkConf conf = new SparkConf()
                .setAppName("sortByKey")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        sc.setLogLevel("warn");

        List<Tuple2<Integer,String>> list = Arrays.asList(
                new Tuple2<>(75,"leo"),
                new Tuple2<>(50,"tom"),
                new Tuple2<>(100,"marray"),
                new Tuple2<>(86,"jack")
        );

        JavaPairRDD<Integer,String> scoresRDD = sc.parallelizePairs(list);

        JavaPairRDD<Integer,String> sortedScoresRDD = scoresRDD.sortByKey(false);

        sortedScoresRDD.foreach(x->{
            System.out.println(x._2+": "+x._1);
        });

        sc.close();
    }

    private static void join(){
        SparkConf conf = new SparkConf()
                .setAppName("join")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        sc.setLogLevel("warn");

        List<Tuple2<Integer,String>> students = Arrays.asList(
                new Tuple2<>(1,"leo"),
                new Tuple2<>(2,"tom"),
                new Tuple2<>(3,"marray"),
                new Tuple2<>(4,"jack")
        );

        List<Tuple2<Integer,Integer>> scores = Arrays.asList(
                new Tuple2<>(1,75),
                new Tuple2<>(2,50),
                new Tuple2<>(3,100),
                new Tuple2<>(4,86)
        );

        JavaPairRDD<Integer,String> studentsRDD = sc.parallelizePairs(students);
        JavaPairRDD<Integer,Integer> scoresRDD = sc.parallelizePairs(scores);

        JavaPairRDD<Integer,Tuple2<String,Integer>> joinedRDD = studentsRDD.join(scoresRDD).sortByKey();

        joinedRDD.foreach(x->{
            System.out.println("students id : " + x._1);
            System.out.println("students name : " + x._2._1);
            System.out.println("students score : " + x._2._2);
            System.out.println("==================================");
        });

        sc.close();
    }


    private static void cogroup(){
        SparkConf conf = new SparkConf()
                .setAppName("cogroup")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        sc.setLogLevel("warn");

        List<Tuple2<Integer,String>> students = Arrays.asList(
                new Tuple2<>(1,"leo1"),
                new Tuple2<>(2,"tom1"),
                new Tuple2<>(3,"marray1"),
                new Tuple2<>(4,"jack1"),
                new Tuple2<>(1,"leo2"),
                new Tuple2<>(2,"tom2"),
                new Tuple2<>(3,"marray2"),
                new Tuple2<>(4,"jack2")

        );


        List<Tuple2<Integer,Integer>> scores = Arrays.asList(
                new Tuple2<>(1,75),
                new Tuple2<>(2,50),
                new Tuple2<>(3,100),
                new Tuple2<>(4,86),
                new Tuple2<>(1,67),
                new Tuple2<>(2,61),
                new Tuple2<>(3,98),
                new Tuple2<>(4,78)
        );

        JavaPairRDD<Integer,String> studentsRDD = sc.parallelizePairs(students);
        JavaPairRDD<Integer,Integer> scoresRDD = sc.parallelizePairs(scores);

        JavaPairRDD<Integer,Tuple2<Iterable<String>,Iterable<Integer>>> cogroupRDD = studentsRDD.cogroup(scoresRDD).sortByKey();

        cogroupRDD.foreach(x->{
            System.out.println("students id : " + x._1);
            System.out.println("students name : " + x._2._1);
            System.out.println("students score : " + x._2._2);
            System.out.println("==================================");
        });

        sc.close();
    }
}

Scala版本

import org.apache.spark.{SparkConf, SparkContext}

object TransformationStudy {

  def main(args: Array[String]): Unit = {
    cogroup()
  }

  private def map(): Unit ={
    val conf = new SparkConf().setAppName("map").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("warn")
    val list = Array(1,2,3,4,5,6)
    val mapRDD = sc.parallelize(list,1)
    mapRDD.map(x=>2*x).foreach(x=>println(x))
    sc.stop()
  }

  private def filter(): Unit ={
    val conf = new SparkConf().setAppName("filter").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("warn")
    val list = Array(1,2,3,4,5,6,7,8,9,10)
    val listRDD = sc.parallelize(list,1)
    list.filter(x => (x&1) == 0).foreach(x=>println(x))
    sc.stop()
  }

  private def flatMap(): Unit ={
    val conf = new SparkConf().setAppName("flatMap").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("warn")
    val list = Array("hello me", "hello you", "hello world")
    val listRDD = sc.parallelize(list,1)
    listRDD.flatMap(x=>x.split(" ")).foreach(x=>println(x))
    sc.stop()
  }

  private def groupByKey(): Unit ={
    val conf = new SparkConf().setAppName("groupByKey").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("warn")
    val list = Array(("class1", 80),("class2",76),("class1",90),("class2",93))
    val listRDD = sc.parallelize(list,1)
    listRDD.groupByKey().foreach(x=>{
      println(x)
    })
    sc.stop()
  }

  private def reduceByKey(): Unit ={
    val conf = new SparkConf().setAppName("reduceByKey").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("warn")
    val list = Array(("class1", 80),("class2",76),("class1",90),("class2",93))
    val listRDD = sc.parallelize(list,1)
    val mapedListRDD = listRDD.map(x=>(x._1,(x._2,1)))
    mapedListRDD.reduceByKey((x,y)=>(x._1+y._1, x._2+y._2)).map(x=>(x._1, x._2._1.toDouble / x._2._2)).foreach(println)
  }

  private def sortByKey(): Unit ={
    val conf = new SparkConf().setAppName("sortByKey").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("warn")
    val list = Array((75, "leo"), (50, "tom"), (100, "marry"), (86, "jack"))
    val listRDD = sc.parallelize(list,1)
    listRDD.sortByKey(ascending = false).foreach(println)
  }

  private def join(): Unit ={
    val conf = new SparkConf().setAppName("join").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("warn")
    val students = Array((1, "leo"), (2, "tom"), (3, "marry"), (4, "jack"))
    val scores = Array((1, 75), (2, 50), (3, 100), (4, 86))
    val studentsRDD = sc.parallelize(students,1)
    val scoresRDD = sc.parallelize(scores,1)
    studentsRDD.join(scoresRDD).sortByKey().foreach(println)
  }

  private def cogroup(): Unit ={
    val conf = new SparkConf().setAppName("cogroup").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("warn")
    val students = Array((1, "leo"), (2, "tom"), (3, "marry"), (4, "jack"))
    val scores = Array((1, 75), (2, 50), (3, 100), (4, 86),(1, 33), (2, 45), (3, 99), (4, 67))
    val studentsRDD = sc.parallelize(students,1)
    val scoresRDD = sc.parallelize(scores,1)
    studentsRDD.cogroup(scoresRDD).sortByKey().foreach(println)
  }
}

3.常用的Action算子

类型 功能描述
reduce 将RDD中的所有元素进行聚合操作,第一个和第二个元素聚合,值与第三个元素聚合,值与第四给元素聚合,以此类推
collect 将RDD中所有元素获取到本地客户端,若数据量特别大,可能会造成网络拥堵
count 获取RDD元素的总个数
take(n) 获取RDD中前n给元素
saveAsTextFile 将RDD元素保存到文件中,对每个元素调用toString方法,saveAsTextFile按照执行task的多少生成多少个文件,如果只想生成一个文件,则在RDD上调用coalesce(1,true).saveAsTextFile(),此时,Spark只起一个task来执行保存的动作,也就只有一个文件产生了,又或者,可以调用repartition(1),它其实是coalesce的一个包装,默认第二个参数为true
countByKey 对每个key对应的值进行count计数
foreach 遍历RDD中的每个元素,与collect不同,foreach是在worker节点上对数据进行遍历操作,与collect把数据传回客户端进行遍历操作相比,性能更好

4.实战演练

java版本

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;

import java.util.*;

public class ActionStudy {

    public static void main(String[] args) {
        //reduce();
        //collect();
        //count();
        //take();
        //saveAsTextFile();
        countByKey();
    }

    private static void reduce(){
        SparkConf conf = new SparkConf()
                .setAppName("reduce")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        sc.setLogLevel("warn");

        JavaRDD<Integer> numberRDD = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10),1);

        Integer sum = numberRDD.reduce(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        });

        System.out.println(sum);

        sc.close();
    }


    private static void collect(){
        SparkConf conf = new SparkConf()
                .setAppName("collect")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        sc.setLogLevel("warn");

        JavaRDD<Integer> numberRDD = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));

        List<Integer> numbers = numberRDD.collect();

        for (Integer e : numbers){
            System.out.println(e);
        }

        sc.close();
    }


    private static void count(){
        SparkConf conf = new SparkConf()
                .setAppName("count")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        sc.setLogLevel("warn");

        JavaRDD<Integer> numberRDD = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));

        long count = numberRDD.count();

        System.out.println(count);

        sc.close();
    }


    private static void take(){
        SparkConf conf = new SparkConf()
                .setAppName("take")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        sc.setLogLevel("warn");

        JavaRDD<Integer> numberRDD = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));

        List<Integer> numbers = numberRDD.take(3);

        for (Integer e : numbers){
            System.out.println(e);
        }

        sc.close();
    }




    private static void saveAsTextFile(){
        SparkConf conf = new SparkConf()
                .setAppName("saveAsTextFile")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        sc.setLogLevel("warn");

        JavaRDD<Integer> numberRDD = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10),3);

        numberRDD.saveAsTextFile("./output");

        sc.close();
    }


    private static void countByKey(){
        SparkConf conf = new SparkConf()
                .setAppName("countByKey")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        sc.setLogLevel("warn");

        List<Tuple2<Integer,String>> list = Arrays.asList(
                new Tuple2<>(75,"leo"),
                new Tuple2<>(50,"tom"),
                new Tuple2<>(100,"marray"),
                new Tuple2<>(86,"jack"),
                new Tuple2<>(75,"leo"),
                new Tuple2<>(50,"tom"),
                new Tuple2<>(100,"marray"),
                new Tuple2<>(86,"jack")
        );

        JavaPairRDD<Integer,String> scoresRDD = sc.parallelizePairs(list);

        Map<Integer,Long> countMap = scoresRDD.countByKey();

        System.out.println(countMap);

        sc.close();
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352