layout: pospost
title: Spark算子
date: 2017-02-27 16:03:52
tags: [Spark算子,Spark,大数据]
categories: "大数据"
RDD创建操作
- parallelize
def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]
从一个Seq集合创建 RDD。
参数1:Seq集合,必须。
参数2:分区数,默认为该Application分配到的资源的CPU核数( Spark will run one task for each partition of the cluster. )
scala> val data = Array(1,2,3,4,5,6);
data: Array[Int] = Array(1, 2, 3, 4, 5, 6)
scala> val num = sc.parallelize(data);
num: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:26
scala> num.collect();
res1: Array[Int] = Array(1, 2, 3, 4, 5, 6)
scala> val num1 = sc.parallelize(1 to 10);
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> num1.collect();
res2: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> num1.partitions.size
res3: Int = 4
scala> val num2 = sc.parallelize(1 to 10 ,2);
num2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24
scala> num2.partitions.size
res4: Int = 2
- makeRDD (跳过)
与parallelize一模一样
从外部存储创建RDD
- textFile
从hdfs文件创建
scala> val textFile = sc.textFile("/user/test/1.txt");
textFile: org.apache.spark.rdd.RDD[String] = /user/test/1.txt MapPartitionsRDD[12] at textFile at <console>:24
scala> textFile.collect();
res6: Array[String] = Array(Hello World, Hello Yang, Hello SCALA)
scala> textFile.count();
res7: Long = 3
从其他HDFS文件格式创建 (跳过)
- hadoopFile
- sequenceFile
- objectFile
- newAPIHadoopFile
从Hadoop接口API创建 (跳过)
- hadoopRDD
- newAPIHadoopRDD
RDD基本转换操作(1) –map、flatMap、distinct
数据准备
hadoop@hzbxs-bigdata16:~/hadoop-hp2$ bin/hadoop dfs -copyFromLocal ~/test/1.txt /user/test/
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.
hadoop@hzbxs-bigdata16:~/hadoop-hp2$ cat ~/test/1.txt
Hello World
Hello Yang
Hello SCALA
hadoop@hzbxs-bigdata16:~/hadoop-hp2$
- map
将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。输入分区与输出分区一对一,即:有多少个输入分区,就有多少个输出分区。
//从HDFS从加载文件
scala> val textFile = sc.textFile("/user/test/1.txt");
textFile: org.apache.spark.rdd.RDD[String] = /user/test/1.txt MapPartitionsRDD[13] at textFile at <console>:24
//打印 textFile 内容
scala> textFile.collect();
res7: Array[String] = Array(Hello World, Hello Yang, Hello SCALA)
// map 算子
scala> val mapdata = textFile.map(line => line.split("\\s+"));
mapdata: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[14] at map at <console>:26
//打印data 内容
scala> mapdata.collect();
res8: Array[Array[String]] = Array(Array(Hello, World), Array(Hello, Yang), Array(Hello, SCALA))
- flatMap
与map类似,但是最后会将结果合并到一个分区。
scala> val flatmapdata = textFile.flatMap(line => line.split("\\s+"));
flatmapdata: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at flatMap at <console>:26
scala> flatmapdata.collect();
res10: Array[String] = Array(Hello, World, Hello, Yang, Hello, SCALA)
- distinct
对元素去重
scala> val flatdistincedata = textFile.flatMap(line => line.split("\\s+")).distinct();
flatdistincedata: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at distinct at <console>:26
scala> flatdistincedata.collect();
res0: Array[String] = Array(Hello, SCALA, World, Yang)
RDD基本转换操作(2)–coalesce、repartition
- coalesce
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]
将RDD的分区数量重新调整为numPartitions个,如果shuffle为true,分区数量可以大于原值。
scala> var data = sc.textFile("/user/test/1.txt");
data: org.apache.spark.rdd.RDD[String] = /user/test/1.txt MapPartitionsRDD[16] at textFile at <console>:24
scala> data.collect();
res9: Array[String] = Array(Hello World, Hello Yang, Hello SCALA)
scala> data.partitions.size;
res11: Int = 2
scala> var newdata = data.coalesce(1);
newdata: org.apache.spark.rdd.RDD[String] = CoalescedRDD[17] at coalesce at <console>:26
scala> newdata.collect();
res12: Array[String] = Array(Hello World, Hello Yang, Hello SCALA)
scala> newdata.partitions.size;
res13: Int = 1
// shuttle 默认为true,新分区数量大于原分区值,无效
scala> var newdata = data.coalesce(3);
newdata: org.apache.spark.rdd.RDD[String] = CoalescedRDD[18] at coalesce at <console>:26
scala> newdata.partitions.size;
res14: Int = 2
// shuttle 默认为false,新分区数量可以大于原分区值,有效
scala> var newdata = data.coalesce(3,true);
newdata: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[22] at coalesce at <console>:26
scala> newdata.partitions.size;
res15: Int = 3
- repartition
def repartition(numPartitions: Int)(implicit ord: Ordering[String]): org.apache.spark.rdd.RDD[String]
相当于 coalesce 的shuttle 为true
scala> var newdata1 = data.repartition(3);
newdata1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[26] at repartition at <console>:26
scala> newdata1.partition
partitioner partitions
scala> newdata1.partitions.size
res16: Int = 3
RDD基本转换操作(3)–union(并集)、intersection(交集)、subtract(差集)
- union
def union(other: RDD[T]): RDD[T]
将两个RDD 合并
scala> var rdd1 = sc.makeRDD(1 to 5,1);
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at makeRDD at <console>:24
scala> var rdd2 = sc.makeRDD(6 to 10,2);
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[29] at makeRDD at <console>:24
scala> rdd1.union(rdd2).collect();
res26: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
- intersetion
def intersection(other: org.apache.spark.rdd.RDD[Int],numPartitions: Int): org.apache.spark.rdd.RDD[Int]
def intersection(other: org.apache.spark.rdd.RDD[Int],partitioner: org.apache.spark.Partitioner)(implicit ord: Ordering[Int]): org.apache.spark.rdd.RDD[Int]
def intersection(other: org.apache.spark.rdd.RDD[Int]): org.apache.spark.rdd.RDD[Int]
求两个RDD的交集,可以指定返回的RDD的分区数或者指定分区函数。不去重
scala> var rdd1 = sc.makeRDD(1 to 5,1);
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at makeRDD at <console>:24
scala> var rdd2 = sc.makeRDD(3 to 7,1);
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at makeRDD at <console>:24
scala> rdd1.intersection(rdd2).collect();
res28: Array[Int] = Array(4, 3, 5)
scala> var rdd3 = rdd1.intersection(rdd2,4);
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[50] at intersection at <console>:28
scala> rdd3.partitions.size
res31: Int = 4
- substract
def subtract(other: RDD[T]): RDD[T]
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
返回在RDD中出现,并且不在otherRDD中出现的元素,不去重。
scala> var rdd1 = sc.makeRDD(Seq(1,1,2,2,3,3));
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[51] at makeRDD at <console>:24
scala> var rdd2 = sc.makeRDD(Seq(3,3,4,4,5,5));
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[52] at makeRDD at <console>:24
scala> rdd1.subtract(rdd2).collect();
res32: Array[Int] = Array(1, 1, 2, 2)
RDD基本转换操作(4)-mapPartitions 、mapPartitionsWithIndex
- mapPartitions
mapPartitions(func)
Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator< T > => Iterator< U > when running on an RDD of type T.
跟map类似,区别在于mapPartitions作用在RDD的每一个分区上,func需要实现从 Iterator< T > 到Iterator< U > 的转换。
scala> var rdd = sc.makeRDD(1 to 10,4);
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[57] at makeRDD at <console>:24
scala> var rdd1 = rdd.mapPartitions{ x => {
| var result = List[Int]()
| var i = 0;
| while (x.hasNext){
| i += x.next()
| }
| result.::(i).iterator
| }}
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[58] at mapPartitions at <console>:26
scala> rdd1.collect
res33: Array[Int] = Array(3, 12, 13, 27)
scala> rdd.partitions.size
res34: Int = 4
- mapPartitionsWithIndex
mapPartitionsWithIndex(func)
Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator< T >) => Iterator< U > when running on an RDD of type T.
跟mapPartitions 类似,区别在于 func需要实现(Int, Iterator< T >) 到 Iterator< U > 的类型转换,第一个整数代表分区的下标
scala> var rdd1 = rdd.mapPartitionsWithIndex{ (index,x) => {
| var result = List[String]()
| var i = 0
| while (x.hasNext){
| i+=x.next()
| }
| result.::(index+"|"+i).iterator
| }
| }
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[60] at mapPartitionsWithIndex at <console>:26
scala> rdd1.collect
res38: Array[String] = Array(0|3, 1|12, 2|13, 3|27)
RDD基本转换操作(5)
- groupByKey
**def groupByKey(): RDD[(K, Iterable[V])] **
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] // 指定分区数
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance.
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.
scala> var arr = Array(("1","YANG"),("1","YANG1"),("2","HELLO"),("2","HELLO2"),("3","WORLD"));
arr: Array[(String, String)] = Array((1,YANG), (1,YANG1), (2,HELLO), (2,HELLO2), (3,WORLD))
scala> var rdd = sc.makeRDD(arr);
rdd: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[61] at makeRDD at <console>:26
scala> var rdd1 = rdd.groupByKey();
rdd1: org.apache.spark.rdd.RDD[(String, Iterable[String])] = ShuffledRDD[62] at groupByKey at <console>:28
scala> rdd1.collect
res39: Array[(String, Iterable[String])] = Array((1,CompactBuffer(YANG, YANG1)), (2,CompactBuffer(HELLO, HELLO2)), (3,CompactBuffer(WORLD)))
- reduceByKey
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
scala> var rdd = sc.makeRDD(Array(("A",1),("A",2),("A",3),("B",3),("B",3)));
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[63] at makeRDD at <console>:24
scala> var rdd1 = rdd.reduceByKey((x,y) => x+y)
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[64] at reduceByKey at <console>:26
scala> rdd1.collect
res40: Array[(String, Int)] = Array((A,6), (B,6))
scala> rdd.partitions.size
res41: Int = 4
scala> rdd1.partitions.size
res42: Int = 4
scala> var rdd1 = rdd.reduceByKey((x,y) => x+y,10);
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[65] at reduceByKey at <console>:26
scala> rdd1.collect
res43: Array[(String, Int)] = Array((A,6), (B,6))
scala> rdd1.partitions.size
res44: Int = 10
- reduceByKeyLocally
类似与reduceByKey ,但是返回的不是RDD,而是Map
scala> rdd.collect
res45: Array[(String, Int)] = Array((A,1), (A,2), (A,3), (B,3), (B,3))
scala> rdd.reduceByKeyLocally((x,y) => x+y)
res46: scala.collection.Map[String,Int] = Map(B -> 6, A -> 6)