RDD讲解

1、RDD的创建的三种方式
1)使用程序中的集合创建RDD
java版代码如下:

/**
 * 并行化集合创建RDD
 * 案例:累加1到10
 */
public class ParallelizeCollection {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("ParallelizeCollection")//应用程序的名称
                .setMaster("local");//本地运行
        JavaSparkContext jsc = new JavaSparkContext(conf);
        List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
        JavaRDD<Integer> numberRDD = jsc.parallelize(numbers);
        Integer count = numberRDD.reduce(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        System.out.println("Count="+count);
        jsc.close();
    }
}

scala版代码如下:

object ParallelizeCollection {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("ParallelizeCollection")// //设置任务名
      .setMaster("local");
    val sc = new SparkContext(conf) //创建SparkCore的程序入口
    val list = List(1,2,3,4,5,6,7,8,9,10)
    val rddList = sc.parallelize(list)
    val sum = rddList.reduce(_+_)
    println("sum:"+sum)
    sc.stop()
  }
}

2)使用本地文件创建RDD
wordcount就是例子
3)使用HDFS文件创建RDD
wordcount就是例子

2、transformation和action讲解与原理剖析


图片.png

3、常用transformation介绍


图片.png

4、常用action介绍


图片.png

5、transformation操作开发实战
1)map:将集合中每个元素乘以2
scala> val list = List(1,2,3,4,5)
list: List[Int] = List(1, 2, 3, 4, 5)
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> rdd.take(5)
res0: Array[Int] = Array(1, 2, 3, 4, 5)                                         
scala> rdd.map(_*2).take(5)
res1: Array[Int] = Array(2, 4, 6, 8, 10)

2)filter:过滤集合中的偶数

>scala> rdd.filter(_%2==0).take(5)
res2: Array[Int] = Array(2, 4)

3)flatMap:将文本行拆分为多个单词
这里说明一下,如果要spark-shell读取本地文件,文件路径前面要加上“file://”,否则spark-shell会到HDFS上去找文件。


图片.png

正确的写法如下:

scala> val datardd = sc.textFile("file:///usr/local/hadoop/spark/test/data")
datardd: org.apache.spark.rdd.RDD[String] = file:///usr/local/hadoop/spark/test/data MapPartitionsRDD[10] at textFile at <console>:24
scala> datardd.flatMap(_.split(" ")).take(10)
res7: Array[String] = Array(hello, world, hello, you, hello, hao, are, you)  

4)groupByKey,将每个班级的成绩分组

scala> val scorelist = Array(Tuple2("class1",80),Tuple2("class2",75),Tuple2("class1",90),Tuple2("class2",60))
scorelist: Array[(String, Int)] = Array((class1,80), (class2,75), (class1,90), (class2,60))
scala> val scores = sc.parallelize(scorelist)
scores: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[12] at parallelize at <console>:26
scala> val groupScores = scores.groupByKey()
groupScores: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[13] at groupByKey at <console>:25
scala> groupScores.take(5)
res8: Array[(String, Iterable[Int])] = Array((class1,CompactBuffer(80, 90)), (class2,CompactBuffer(75, 60)))

5)reduceByKey:统计每个班级的总分

scala> val sum = scores.reduceByKey(_+_)
sum: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[14] at reduceByKey at <console>:25
scala> sum.take(5)
res9: Array[(String, Int)] = Array((class1,170), (class2,135))

6)sortByKey:按照学生分数排序

scala> val scoreli = Array(Tuple2(80,"leo"),Tuple2(75,"Jack"),Tuple2(90,"Tom"),Tuple2(60,"Marry")) 
scoreli: Array[(Int, String)] = Array((80,leo), (75,Jack), (90,Tom), (60,Marry))
scala> val scoreRdd = sc.parallelize(scoreli)
scoreRdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[15] at parallelize at <console>:26

这里排序,默认是升序

scala> val scoreSortByKey  =scoreRdd.sortByKey()
scoreSortByKey: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[17] at sortByKey at <console>:25
scala> scoreSortByKey.take(5)
res11: Array[(Int, String)] = Array((60,Marry), (75,Jack), (80,leo), (90,Tom))

如果要降序排序,就加一个参数false

scala> val scoreSortByKeyDown  =scoreRdd.sortByKey(false)
scoreSortByKeyDown: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[18] at sortByKey at <console>:25
scala> scoreSortByKeyDown.take(5)
res12: Array[(Int, String)] = Array((90,Tom), (80,leo), (75,Jack), (60,Marry))

7)join:打印每个学生的成绩

scala> val studentlist = Array(Tuple2(1,"leo"),Tuple2(2,"Jack"),Tuple2(3,"Tom"))
studentlist: Array[(Int, String)] = Array((1,leo), (2,Jack), (3,Tom))
scala> val scorelist = Array(Tuple2(1,95),Tuple2(2,98),Tuple2(3,93))
scorelist: Array[(Int, Int)] = Array((1,95), (2,98), (3,93))
scala> val students = sc.parallelize(studentlist)
students: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[19] at parallelize at <console>:26
scala> val scores = sc.parallelize(scorelist)
scores: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[20] at parallelize at <console>:26
scala> val studentscores = students.join(scores)
studentscores: org.apache.spark.rdd.RDD[(Int, (String, Int))] = MapPartitionsRDD[23] at join at <console>:27

scala> studentscores.take(10)
res13: Array[(Int, (String, Int))] = Array((1,(leo,95)), (3,(Tom,93)), (2,(Jack,98)))

8)cogroup:动物分类

scala> val list1RDD = sc.parallelize(List((1, "cat"), (2, "dog")))
list1RDD: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[26] at parallelize at <console>:24

scala> val list2RDD = sc.parallelize(List((1, "tiger"), (1, "elephant"), (3, "panda"), (3, "chicken")))
list2RDD: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[27] at parallelize at <console>:24

scala> val list3RDD = sc.parallelize(List((1, "duck"), (1, "lion"), (3, "bird"), (3, "fish"), (4, "flowers")))
list3RDD: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[28] at parallelize at <console>:24

scala> val cogroupresult = list1RDD.cogroup(list2RDD,list3RDD)
cogroupresult: org.apache.spark.rdd.RDD[(Int, (Iterable[String], Iterable[String], Iterable[String]))] = MapPartitionsRDD[30] at cogroup at <console>:29

scala> cogroupresult.foreach(println)
(4,(CompactBuffer(),CompactBuffer(),CompactBuffer(flowers)))
(1,(CompactBuffer(cat),CompactBuffer(tiger, elephant),CompactBuffer(duck, lion)))
(3,(CompactBuffer(),CompactBuffer(panda, chicken),CompactBuffer(bird, fish)))
(2,(CompactBuffer(dog),CompactBuffer(),CompactBuffer()))

从上面两个实例可以看出来:join就是把两个集合根据key,进行内容聚合,而cogroup在聚合时会先对RDD中相同的key进行合并,再聚合。

6、action操作开发实战
1)reduce

scala> val numberarray= Array(1,2,3,4,5,6)
numberarray: Array[Int] = Array(1, 2, 3, 4, 5, 6)
scala> val rdd = sc.parallelize(numberarray)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> val sum = rdd.reduce(_+_)
sum: Int = 21  

2)collect

scala> rdd.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6)

3)count

scala> rdd.count
res1: Long = 6

4)take

scala> rdd.take(rdd.count.toInt)
res3: Array[Int] = Array(1, 2, 3, 4, 5, 6)

5)saveAsTextFile

rdd.map(_*2).saveAsTextFile("file:///usr/local/hadoop/spark/test/output/array.txt")

6)countByKey

scala> val scorelist = Array(Tuple2("class1","leo"),Tuple2("class2","Jack"),Tuple2("class1","Tom"),Tuple2("class2","Jen"),("class2","marry"))
scorelist: Array[(String, String)] = Array((class1,leo), (class2,Jack), (class1,Tom), (class2,Jen), (class2,marry))

scala> val scores = sc.parallelize(scorelist)
scores: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[3] at parallelize at <console>:26

scala> val studentCOunts = scores.countByKey()
studentCOunts: scala.collection.Map[String,Long] = Map(class1 -> 2, class2 -> 3)

7)foreach

scala> studentCOunts.foreach(println)
(class1,2)
(class2,3)

7、RDD持久化策略


图片.png

图片.png

图片.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容