1、RDD的创建的三种方式
1)使用程序中的集合创建RDD
java版代码如下:
/**
* 并行化集合创建RDD
* 案例:累加1到10
*/
public class ParallelizeCollection {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("ParallelizeCollection")//应用程序的名称
.setMaster("local");//本地运行
JavaSparkContext jsc = new JavaSparkContext(conf);
List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
JavaRDD<Integer> numberRDD = jsc.parallelize(numbers);
Integer count = numberRDD.reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
System.out.println("Count="+count);
jsc.close();
}
}
scala版代码如下:
object ParallelizeCollection {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("ParallelizeCollection")// //设置任务名
.setMaster("local");
val sc = new SparkContext(conf) //创建SparkCore的程序入口
val list = List(1,2,3,4,5,6,7,8,9,10)
val rddList = sc.parallelize(list)
val sum = rddList.reduce(_+_)
println("sum:"+sum)
sc.stop()
}
}
2)使用本地文件创建RDD
wordcount就是例子
3)使用HDFS文件创建RDD
wordcount就是例子
2、transformation和action讲解与原理剖析
3、常用transformation介绍
4、常用action介绍
5、transformation操作开发实战
1)map:将集合中每个元素乘以2
scala> val list = List(1,2,3,4,5)
list: List[Int] = List(1, 2, 3, 4, 5)
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> rdd.take(5)
res0: Array[Int] = Array(1, 2, 3, 4, 5)
scala> rdd.map(_*2).take(5)
res1: Array[Int] = Array(2, 4, 6, 8, 10)
2)filter:过滤集合中的偶数
>scala> rdd.filter(_%2==0).take(5)
res2: Array[Int] = Array(2, 4)
3)flatMap:将文本行拆分为多个单词
这里说明一下,如果要spark-shell读取本地文件,文件路径前面要加上“file://”,否则spark-shell会到HDFS上去找文件。
正确的写法如下:
scala> val datardd = sc.textFile("file:///usr/local/hadoop/spark/test/data")
datardd: org.apache.spark.rdd.RDD[String] = file:///usr/local/hadoop/spark/test/data MapPartitionsRDD[10] at textFile at <console>:24
scala> datardd.flatMap(_.split(" ")).take(10)
res7: Array[String] = Array(hello, world, hello, you, hello, hao, are, you)
4)groupByKey,将每个班级的成绩分组
scala> val scorelist = Array(Tuple2("class1",80),Tuple2("class2",75),Tuple2("class1",90),Tuple2("class2",60))
scorelist: Array[(String, Int)] = Array((class1,80), (class2,75), (class1,90), (class2,60))
scala> val scores = sc.parallelize(scorelist)
scores: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[12] at parallelize at <console>:26
scala> val groupScores = scores.groupByKey()
groupScores: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[13] at groupByKey at <console>:25
scala> groupScores.take(5)
res8: Array[(String, Iterable[Int])] = Array((class1,CompactBuffer(80, 90)), (class2,CompactBuffer(75, 60)))
5)reduceByKey:统计每个班级的总分
scala> val sum = scores.reduceByKey(_+_)
sum: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[14] at reduceByKey at <console>:25
scala> sum.take(5)
res9: Array[(String, Int)] = Array((class1,170), (class2,135))
6)sortByKey:按照学生分数排序
scala> val scoreli = Array(Tuple2(80,"leo"),Tuple2(75,"Jack"),Tuple2(90,"Tom"),Tuple2(60,"Marry"))
scoreli: Array[(Int, String)] = Array((80,leo), (75,Jack), (90,Tom), (60,Marry))
scala> val scoreRdd = sc.parallelize(scoreli)
scoreRdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[15] at parallelize at <console>:26
这里排序,默认是升序
scala> val scoreSortByKey =scoreRdd.sortByKey()
scoreSortByKey: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[17] at sortByKey at <console>:25
scala> scoreSortByKey.take(5)
res11: Array[(Int, String)] = Array((60,Marry), (75,Jack), (80,leo), (90,Tom))
如果要降序排序,就加一个参数false
scala> val scoreSortByKeyDown =scoreRdd.sortByKey(false)
scoreSortByKeyDown: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[18] at sortByKey at <console>:25
scala> scoreSortByKeyDown.take(5)
res12: Array[(Int, String)] = Array((90,Tom), (80,leo), (75,Jack), (60,Marry))
7)join:打印每个学生的成绩
scala> val studentlist = Array(Tuple2(1,"leo"),Tuple2(2,"Jack"),Tuple2(3,"Tom"))
studentlist: Array[(Int, String)] = Array((1,leo), (2,Jack), (3,Tom))
scala> val scorelist = Array(Tuple2(1,95),Tuple2(2,98),Tuple2(3,93))
scorelist: Array[(Int, Int)] = Array((1,95), (2,98), (3,93))
scala> val students = sc.parallelize(studentlist)
students: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[19] at parallelize at <console>:26
scala> val scores = sc.parallelize(scorelist)
scores: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[20] at parallelize at <console>:26
scala> val studentscores = students.join(scores)
studentscores: org.apache.spark.rdd.RDD[(Int, (String, Int))] = MapPartitionsRDD[23] at join at <console>:27
scala> studentscores.take(10)
res13: Array[(Int, (String, Int))] = Array((1,(leo,95)), (3,(Tom,93)), (2,(Jack,98)))
8)cogroup:动物分类
scala> val list1RDD = sc.parallelize(List((1, "cat"), (2, "dog")))
list1RDD: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[26] at parallelize at <console>:24
scala> val list2RDD = sc.parallelize(List((1, "tiger"), (1, "elephant"), (3, "panda"), (3, "chicken")))
list2RDD: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[27] at parallelize at <console>:24
scala> val list3RDD = sc.parallelize(List((1, "duck"), (1, "lion"), (3, "bird"), (3, "fish"), (4, "flowers")))
list3RDD: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[28] at parallelize at <console>:24
scala> val cogroupresult = list1RDD.cogroup(list2RDD,list3RDD)
cogroupresult: org.apache.spark.rdd.RDD[(Int, (Iterable[String], Iterable[String], Iterable[String]))] = MapPartitionsRDD[30] at cogroup at <console>:29
scala> cogroupresult.foreach(println)
(4,(CompactBuffer(),CompactBuffer(),CompactBuffer(flowers)))
(1,(CompactBuffer(cat),CompactBuffer(tiger, elephant),CompactBuffer(duck, lion)))
(3,(CompactBuffer(),CompactBuffer(panda, chicken),CompactBuffer(bird, fish)))
(2,(CompactBuffer(dog),CompactBuffer(),CompactBuffer()))
从上面两个实例可以看出来:join就是把两个集合根据key,进行内容聚合,而cogroup在聚合时会先对RDD中相同的key进行合并,再聚合。
6、action操作开发实战
1)reduce
scala> val numberarray= Array(1,2,3,4,5,6)
numberarray: Array[Int] = Array(1, 2, 3, 4, 5, 6)
scala> val rdd = sc.parallelize(numberarray)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> val sum = rdd.reduce(_+_)
sum: Int = 21
2)collect
scala> rdd.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6)
3)count
scala> rdd.count
res1: Long = 6
4)take
scala> rdd.take(rdd.count.toInt)
res3: Array[Int] = Array(1, 2, 3, 4, 5, 6)
5)saveAsTextFile
rdd.map(_*2).saveAsTextFile("file:///usr/local/hadoop/spark/test/output/array.txt")
6)countByKey
scala> val scorelist = Array(Tuple2("class1","leo"),Tuple2("class2","Jack"),Tuple2("class1","Tom"),Tuple2("class2","Jen"),("class2","marry"))
scorelist: Array[(String, String)] = Array((class1,leo), (class2,Jack), (class1,Tom), (class2,Jen), (class2,marry))
scala> val scores = sc.parallelize(scorelist)
scores: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[3] at parallelize at <console>:26
scala> val studentCOunts = scores.countByKey()
studentCOunts: scala.collection.Map[String,Long] = Map(class1 -> 2, class2 -> 3)
7)foreach
scala> studentCOunts.foreach(println)
(class1,2)
(class2,3)
7、RDD持久化策略