Spark RDD操作
RDD(Resilient Distributed Dataset),弹性分布式数据集是一个容错的,并行的数据结构,可以显式地将数据存储到磁盘和内存中,并能控制数据的分区。RDD有两种操作方式,转换(transformation)和动作(action)。转换是从现有的数据集创建一个新的数据集;动作在数据集上运行计算后,返回一个值给驱动程序。
转换都是惰性的,只有到动作启动时才会真正去取得数据运算。
一、Spark RDD创建操作
RDD数据来源:两个数据来源创建RDD,一种是外部数据源,如HDFS或是本地文件系统等其他途径读入文件建立分布数据集;另一种是通过spark所提供的方法来创建数据,parallelize()创建一个能够并行操作的分布数据集。
1.外部数据源
通过SparkContext的textFile方法读入文件建立一个分布数据集。
deftextFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
val distFile1 = sc.textFile("data.txt") //本地当前目录下的文件
val distFile2 = sc.textFile("hdfs://ip:port/user/names.txt") //HDFS文件
val distFile3 = sc.textFile("file:/input/data.txt") //本地指定目录下的文件
val distFile4 = sc.textFile("/input/data1.txt, /input/data2.txt") //读取多个文件
textFile方法第二个参数是用来设置文件的分片数量。默认情况下,Spark会为文件的每一个块创建一个分片。
【注】分片的数量绝不能小于文件块的数量
2.数据集合
val data = Array(1,2,3,4,5,6,7,8,9)
//parallelize(data, nums)中nums是设置分片数量的,默认情况下spark会自动根据集群情况进行设定的
val distData = sc.parallelize(data, 3)
二、Spark RDD转换操作
1. map
map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD,RDD之间的元素是一对一关系。
scala> val mrdd = sc.parallelize(1 to 9, 3)
//mrdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:27
scala> val mrdd2 = mrdd.map(x => x*2)
//mrdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at map at <console>:29
scala> mrdd2.collect
//res5: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)
2.filter
对RDD元素进行过滤,返回一个新的数据集,由经过func函数后返回值为true的原元素组成。
scala> val frdd = mrdd2.filter(x => x>10)
//frdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at filter at <console>:31
scala> frdd.collect
//res7: Array[Int] = Array(12, 14, 16, 18)
3.flatMap
类似于map,但是每一个输入元素会被映射为0到多个输入元素,RDD之间的元素是一对多关系。
scala> val fmrdd =frdd.flatMap(x => x to 20)
//fmrdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[8] at flatMap at <console>:33
scala> fmrdd.collect
res8: Array[Int] = Array(12, 13, 14, 15, 16, 17, 18, 19, 20, 14, 15, 16, 17, 18, 19, 20, 16, 17, 18, 19, 20, 18, 19, 20)
4.sample
sample(withReplacement, fraction, seed)是根据给定的随机种子seed,随机抽样出数量为fraction的数据。其中,withReplacement:是否放回抽样;fraction:比例,0.1表示10%;seed:随机种子,相同的seed得到的随机序列是一样的。
scala> val data = sc.parallelize(1 to 1000, 3)
//data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> data.sample(false, 0.1,0).count
//res0: Long = 115
5.union
union(otherDataset)是数据合并,返回一个新的数据集,由原数据集和ottherDataset联合而成。
val unionRdd = RDD1.union(RDD2)
6.intersection
intersection(otherDataset)是数据交集,返回一个新的数据集,包含两个数据集的交集数据。
val rdd3 = rdd1.intersection(rdd2)
7.distinct
distinct([numTasks])数据去重,返回一个数据集,它是对两个数据集去除重复数据,numTasks参数是设置任务并行数量。
val rdd2 = rdd1.distinct()
8.groupByKey
groupByKey([numTasks])是数据分组操作,在一个由(K, V)键值对组成的数据集上调用,返回一个(K, Seq[V])对的数据集。
scala> val rdd0 = sc.parallelize(Array((1,1), (1, 2), (1,3), (2, 1), (2, 2), (2, 3)), 3)
//rdd0: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[5] at parallelize at <console>:27
scala> val rdd1 = rdd0.groupByKey()
//rdd1: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[6] at groupByKey at <console>:29
scala> rdd1.collect
//Array[(Int, Iterable[Int])] = Array((1,CompactBuffer(1, 2, 3)), (2,CompactBuffer(1, 2, 3)))