RDD讲解

1、RDD的创建的三种方式
1)使用程序中的集合创建RDD
java版代码如下:

/**
 * 并行化集合创建RDD
 * 案例:累加1到10
 */
public class ParallelizeCollection {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("ParallelizeCollection")//应用程序的名称
                .setMaster("local");//本地运行
        JavaSparkContext jsc = new JavaSparkContext(conf);
        List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
        JavaRDD<Integer> numberRDD = jsc.parallelize(numbers);
        Integer count = numberRDD.reduce(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        System.out.println("Count="+count);
        jsc.close();
    }
}

scala版代码如下:

object ParallelizeCollection {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("ParallelizeCollection")// //设置任务名
      .setMaster("local");
    val sc = new SparkContext(conf) //创建SparkCore的程序入口
    val list = List(1,2,3,4,5,6,7,8,9,10)
    val rddList = sc.parallelize(list)
    val sum = rddList.reduce(_+_)
    println("sum:"+sum)
    sc.stop()
  }
}

2)使用本地文件创建RDD
wordcount就是例子
3)使用HDFS文件创建RDD
wordcount就是例子

2、transformation和action讲解与原理剖析


图片.png

3、常用transformation介绍


图片.png

4、常用action介绍


图片.png

5、transformation操作开发实战
1)map:将集合中每个元素乘以2
scala> val list = List(1,2,3,4,5)
list: List[Int] = List(1, 2, 3, 4, 5)
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> rdd.take(5)
res0: Array[Int] = Array(1, 2, 3, 4, 5)                                         
scala> rdd.map(_*2).take(5)
res1: Array[Int] = Array(2, 4, 6, 8, 10)

2)filter:过滤集合中的偶数

>scala> rdd.filter(_%2==0).take(5)
res2: Array[Int] = Array(2, 4)

3)flatMap:将文本行拆分为多个单词
这里说明一下,如果要spark-shell读取本地文件,文件路径前面要加上“file://”,否则spark-shell会到HDFS上去找文件。


图片.png

正确的写法如下:

scala> val datardd = sc.textFile("file:///usr/local/hadoop/spark/test/data")
datardd: org.apache.spark.rdd.RDD[String] = file:///usr/local/hadoop/spark/test/data MapPartitionsRDD[10] at textFile at <console>:24
scala> datardd.flatMap(_.split(" ")).take(10)
res7: Array[String] = Array(hello, world, hello, you, hello, hao, are, you)  

4)groupByKey,将每个班级的成绩分组

scala> val scorelist = Array(Tuple2("class1",80),Tuple2("class2",75),Tuple2("class1",90),Tuple2("class2",60))
scorelist: Array[(String, Int)] = Array((class1,80), (class2,75), (class1,90), (class2,60))
scala> val scores = sc.parallelize(scorelist)
scores: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[12] at parallelize at <console>:26
scala> val groupScores = scores.groupByKey()
groupScores: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[13] at groupByKey at <console>:25
scala> groupScores.take(5)
res8: Array[(String, Iterable[Int])] = Array((class1,CompactBuffer(80, 90)), (class2,CompactBuffer(75, 60)))

5)reduceByKey:统计每个班级的总分

scala> val sum = scores.reduceByKey(_+_)
sum: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[14] at reduceByKey at <console>:25
scala> sum.take(5)
res9: Array[(String, Int)] = Array((class1,170), (class2,135))

6)sortByKey:按照学生分数排序

scala> val scoreli = Array(Tuple2(80,"leo"),Tuple2(75,"Jack"),Tuple2(90,"Tom"),Tuple2(60,"Marry")) 
scoreli: Array[(Int, String)] = Array((80,leo), (75,Jack), (90,Tom), (60,Marry))
scala> val scoreRdd = sc.parallelize(scoreli)
scoreRdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[15] at parallelize at <console>:26

这里排序,默认是升序

scala> val scoreSortByKey  =scoreRdd.sortByKey()
scoreSortByKey: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[17] at sortByKey at <console>:25
scala> scoreSortByKey.take(5)
res11: Array[(Int, String)] = Array((60,Marry), (75,Jack), (80,leo), (90,Tom))

如果要降序排序,就加一个参数false

scala> val scoreSortByKeyDown  =scoreRdd.sortByKey(false)
scoreSortByKeyDown: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[18] at sortByKey at <console>:25
scala> scoreSortByKeyDown.take(5)
res12: Array[(Int, String)] = Array((90,Tom), (80,leo), (75,Jack), (60,Marry))

7)join:打印每个学生的成绩

scala> val studentlist = Array(Tuple2(1,"leo"),Tuple2(2,"Jack"),Tuple2(3,"Tom"))
studentlist: Array[(Int, String)] = Array((1,leo), (2,Jack), (3,Tom))
scala> val scorelist = Array(Tuple2(1,95),Tuple2(2,98),Tuple2(3,93))
scorelist: Array[(Int, Int)] = Array((1,95), (2,98), (3,93))
scala> val students = sc.parallelize(studentlist)
students: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[19] at parallelize at <console>:26
scala> val scores = sc.parallelize(scorelist)
scores: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[20] at parallelize at <console>:26
scala> val studentscores = students.join(scores)
studentscores: org.apache.spark.rdd.RDD[(Int, (String, Int))] = MapPartitionsRDD[23] at join at <console>:27

scala> studentscores.take(10)
res13: Array[(Int, (String, Int))] = Array((1,(leo,95)), (3,(Tom,93)), (2,(Jack,98)))

8)cogroup:动物分类

scala> val list1RDD = sc.parallelize(List((1, "cat"), (2, "dog")))
list1RDD: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[26] at parallelize at <console>:24

scala> val list2RDD = sc.parallelize(List((1, "tiger"), (1, "elephant"), (3, "panda"), (3, "chicken")))
list2RDD: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[27] at parallelize at <console>:24

scala> val list3RDD = sc.parallelize(List((1, "duck"), (1, "lion"), (3, "bird"), (3, "fish"), (4, "flowers")))
list3RDD: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[28] at parallelize at <console>:24

scala> val cogroupresult = list1RDD.cogroup(list2RDD,list3RDD)
cogroupresult: org.apache.spark.rdd.RDD[(Int, (Iterable[String], Iterable[String], Iterable[String]))] = MapPartitionsRDD[30] at cogroup at <console>:29

scala> cogroupresult.foreach(println)
(4,(CompactBuffer(),CompactBuffer(),CompactBuffer(flowers)))
(1,(CompactBuffer(cat),CompactBuffer(tiger, elephant),CompactBuffer(duck, lion)))
(3,(CompactBuffer(),CompactBuffer(panda, chicken),CompactBuffer(bird, fish)))
(2,(CompactBuffer(dog),CompactBuffer(),CompactBuffer()))

从上面两个实例可以看出来:join就是把两个集合根据key,进行内容聚合,而cogroup在聚合时会先对RDD中相同的key进行合并,再聚合。

6、action操作开发实战
1)reduce

scala> val numberarray= Array(1,2,3,4,5,6)
numberarray: Array[Int] = Array(1, 2, 3, 4, 5, 6)
scala> val rdd = sc.parallelize(numberarray)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> val sum = rdd.reduce(_+_)
sum: Int = 21  

2)collect

scala> rdd.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6)

3)count

scala> rdd.count
res1: Long = 6

4)take

scala> rdd.take(rdd.count.toInt)
res3: Array[Int] = Array(1, 2, 3, 4, 5, 6)

5)saveAsTextFile

rdd.map(_*2).saveAsTextFile("file:///usr/local/hadoop/spark/test/output/array.txt")

6)countByKey

scala> val scorelist = Array(Tuple2("class1","leo"),Tuple2("class2","Jack"),Tuple2("class1","Tom"),Tuple2("class2","Jen"),("class2","marry"))
scorelist: Array[(String, String)] = Array((class1,leo), (class2,Jack), (class1,Tom), (class2,Jen), (class2,marry))

scala> val scores = sc.parallelize(scorelist)
scores: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[3] at parallelize at <console>:26

scala> val studentCOunts = scores.countByKey()
studentCOunts: scala.collection.Map[String,Long] = Map(class1 -> 2, class2 -> 3)

7)foreach

scala> studentCOunts.foreach(println)
(class1,2)
(class2,3)

7、RDD持久化策略


图片.png

图片.png

图片.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,047评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,807评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,501评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,839评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,951评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,117评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,188评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,929评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,372评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,679评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,837评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,536评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,168评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,886评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,129评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,665评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,739评论 2 351

推荐阅读更多精彩内容