spark学习笔记-RDD(win+java)

1、def:

​ RDD,弹性分布式数据集(Resilient Distributed Datasets),表示一个只读、分区且不变的数据集合,是spark应用中最核心的部分。spark处理数据时会将一整块数据分割成由多个分块数据组成的数据集(RDD),然后找到多于数据集分块个数的执行器进行数据处理,最终将计算的结果进行汇总。

主要优势 :RDD中的批量错做会根据数据存放的位置来调度任务;对于扫描类型的操作,如果内存不足以缓存整个RDD,就进行部分缓存,避免内存溢出。

2、创建

1)通过已存在的并行集合创建(调用SparkContext的parallelize方法将一个已存在的集合变成RDD):

        //初始化
        private static JavaSparkContext sc;
        //初始化本地配置
        SparkConf conf = new SparkConf().setMaster("local").setAppName("RDDDemo");
        //初始化sparkContext对象
        sc = new JavaSparkContext(conf);
        sc.setLogLevel("ERROR");
        // 原始数据转换成RDD
        List<Integer> list1 = Arrays.asList(5, 4, 3, 2, 1, 5);
        JavaRDD<Integer> rdd1 = sc.parallelize(list1);
        List<Integer> list2 = Arrays.asList(3,4,5,6,7,8);
        JavaRDD<Integer> rdd2 = sc.parallelize(list2);
        System.out.println("rdd1原始数据:" + rdd1.collect());
        System.out.println("rdd2原始数据:" + rdd2.collect());

运行结果:

rdd1原始数据:[5, 4, 3, 2, 1, 5]
rdd2原始数据:[3, 4, 5, 6, 7, 8]

​ parallelize方法还有一个参数是分区(Partitions)数量,它可以用来指定数据集的分区个数,例如上述代码中sc.parallelize(list)可以写成sc.parallelize(list,10),其中10就是数据集分区的个数。集群中的每一个分区对应一个spark任务,每一个cpu计算2-4个分区时较好,若不设置spark就会根据集群的 情况来自动设置分区数量,一般默认与cpu核心数相同。

2)从外部数据集(Dataset)创建

​ spark可以从本地文件系统、文本文件、sequenceFiles、HDFS、Cassandra、HBase、Amazon S3以及Hadoop所支持的任何存储源中创建RDD。通过SparkContext的textFile方法将数据源文件转换成RDD,此方法的参数为文件地址。转换后的数据将会以行集合的方式进行存储,例如:

 JavaRDD<String> rdd = sc.textFile("wordCount");
 System.out.println("原始数据:" + rdd.collect());

运行结果:

原始数据:[here, where, my, your, hello, world, test, file, jump, you, can, you, jump]

注:当使用本地文件系统进行读取操作转换时,必须保证所有工作节点在相同路径下能够访问该文件,可以将文件复制到所有工作节点的相同目录下,或者使用共享文件系统。

3、操作

  1. 转换(transformations):在一个已存在的RDD上创建一个新的RDD,但实际的计算并没有执行,仅仅记录操作规程,所有的计算都发生在actions环节。

    • map转换

      依次取出RDD中的每一个元素,传给表达式进行转换,返回转换后的结果。

         /**
           * 对每个元素进行操作(+10),返回一个新的RDD
           */
          public static void map(JavaRDD<Integer> rdd) {
              System.out.println("RDD每个元素加10:" + rdd.map(v -> v + 10).collect());
          }System.out.println("RDD每个元素乘10:" + rdd.map(v -> v + 10).collect());
      

      运行结果:

      RDD每个元素加10:[15, 14, 13, 12, 11, 15]
      
  • filter转换

    返回符合指定过滤条件的元素的列表

       /**
         * 最每个元素进行筛选,返回符合条件的元素组成的一个新RDD
         */
        public static void filter(JavaRDD<Integer> rdd) {
            System.out.println("RDD去掉不为5的元素:" + rdd.filter(v -> v != 5).collect());
        }
    

    运行结果:

    RDD去掉不为5的元素:[4, 3, 2, 1]
    
  • union转换

    求给定RDD的并集∪

       /**
         * union
         */
        public static void union(JavaRDD<Integer> rdd1,JavaRDD<Integer> rdd2){
            System.out.println("rdd1并rdd2:"+rdd1.union(rdd2).collect());
        }
    

    运行结果

    rdd1并rdd2:[5, 4, 3, 2, 1, 5, 3, 4, 5, 6, 7, 8]
    
  • intersection转换

    求交集

       /**
         * 交
         */
        public static void intersection(JavaRDD<Integer> rdd1,JavaRDD<Integer> rdd2){
            System.out.println("rdd1交rdd2:"+rdd1.intersection(rdd2).collect());
        }
    

    运行结果

    rdd1交rdd2:[4, 3, 5]
    
  • distinct转换

    去掉重复值

       /**
         * 去重操作
         */
        public static void distinct(JavaRDD<Integer> rdd) {
            System.out.println("RDD去重操作:" + rdd.distinct().collect());
        }
    

    运行结果

    RDD去重操作:[4, 1, 3, 5, 2]
    
  1. 动作(actions):执行记录的所有transformations操作并计算结果,结果可返回到driver程序,也可保存到相关存储系统中。
  • reduce(f)动作

    对所给的RDD进行聚合

     /**
         * 并行整合RDD中所有数据
         */
        public static void reduce(JavaRDD<Integer> rdd) {
            System.out.println("整合RDD中所有数据(sum):" + rdd.reduce((v1, v2) -> v1 + v2));
        }
    

    运行结果:

    整合RDD中所有数据(sum):20
    
  • collect()动作

    返回一个包含RDD所有元素的list

  • take(num)动作

    返回给定RDD的前n个元素的list

     /**
         * 取出rdd返回num个元素 返回一个list
         */
        public static void take(JavaRDD<Integer> rdd) {
            System.out.println("取出rdd返回2个元素:" + rdd.take(2));
        }
    

    运行结果:

    取出rdd返回2个元素:[5, 4]
    
  • first()动作

    返回RDD中第一个元素的值

     public static void getFirst(JavaRDD<Integer> rdd){
            System.out.println("第一个元素:"+rdd.first());
        }
    

    运行结果

    第一个元素:5
    
  • top(num)动作

    取出RDD中前num个最大值

    public static void getTop(JavaRDD<Integer> rdd){
            System.out.println("前三个"+rdd.top(3));
        }
    

    运行结果

    前三个[5, 5, 4]
    
  • foreach()动作

    遍历RDD

    public static void foreach(JavaRDD<Integer> rdd) {
            System.out.print("foreach:");
            rdd.foreach(t -> System.out.print(t+" "));
        }
    

    运行结果:

    foreach:5 4 3 2 1 5      
    
  • count()动作

    对RDD中所有元素进行计数

    public static void count(JavaRDD<Integer> rdd) {
            System.out.println("统计RDD的所有元素:" + rdd.count());
        }        
    

    运行结果:

    统计RDD的所有元素:6
    
  • countByValue()动作

    按值计数

    /**
         * 每个元素出现的次数
         */
        public static void countByValue(JavaRDD<Integer> rdd) {
            System.out.println("每个元素出现的次数:" + rdd.countByValue());
        }
    

    运行结果

    每个元素出现的次数:{5=2, 1=1, 2=1, 3=1, 4=1}
    

PS :lambda表达式不支持解决方法:

case1:maven配置文件setting.xml中JDK改为8以上:

    <profile>
      <id>jdk-1.8</id>

      <activation>
        <activeByDefault>true</activeByDefault>
        <jdk>1.8</jdk>
      </activation>

       <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
    </properties>

      <repositories>
        <repository>
          <id>jdk18</id>
          <name>Repository for JDK 1.8 builds</name>
          <url>http://www.myhost.com/maven/jdk18</url>
          <layout>default</layout>
          <snapshotPolicy>always</snapshotPolicy>
        </repository>
      </repositories>
    </profile>

case2:

file --> Project Structure -->modules

将language level改为8以上

apply-->ok

感谢:

https://www.cnblogs.com/diaozhaojian/p/9152530.html

https://www.jianshu.com/p/d573573dd97f

https://www.cnblogs.com/dhrwawa/p/10981167.html

https://blog.csdn.net/wolf2s/article/details/78958275?depth_1-utm_source=distribute.pc_relevant.none-task&utm_source=distribute.pc_relevant.none-task

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,193评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,306评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,130评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,110评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,118评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,085评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,007评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,844评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,283评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,508评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,395评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,985评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,630评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,797评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,653评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,553评论 2 352

推荐阅读更多精彩内容

  • DAY 05 1、 public classArrayDemo { public static void mai...
    周书达阅读 660评论 0 0
  • 第一章 初识javaJAVA 第一讲:什么是程序?:为了让计算机执行某些操作或解决某个问题而编写的一系列有序指令的...
    人子日月几点阅读 517评论 0 1
  • 设计模式分类 总体来说设计模式分为三大类:创建型模式,共五种:工厂方法模式、抽象工厂模式、单例模式、建造者模式、原...
    lifeline丿毅阅读 1,209评论 0 2
  • 50道经典Java编程练习题,将数学思维运用到编程中来。抱歉哈找不到文章的原贴了,有冒犯的麻烦知会声哈~ 1.指数...
    OSET我要编程阅读 6,960评论 0 9
  • http://spark.apache.org/docs/latest/api/python/index.html...
    mpro阅读 6,094评论 0 4