SparkCore(一)(RDD和一些算子)

什么是RDD

RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象。代码中是一个抽象类,它代表一个不可变、可分区、里面的元素可并行计算的集合。

  • 一组分区(Partition),即数据集的基本组成单位;

  • 一个计算每个分区的函数;

  • RDD之间的依赖关系;

  • 一个Partitioner,即RDD的分片函数;

  • 一个列表,存储存取每个Partition的优先位置(preferred location)。

RDD创建的方法

  • 从集合中创建 并行度一般为2
##makerdd或parallise都是根据totalcpucores和2比较最大值
##如果直接覆盖makerdd或parallise的第二个分区个数的参数可以改变数量
 override def defaultParallelism(): Int = {
    conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
  }

#查看源码所得
sc.parallelize

#makeRDD实际上是在内部创建了一个parallelize
sc.makeRDD

  • 从文件中转换
#从文件转换RDD
sc.textFile

#从文件夹拉取多个文件
sc.wholeTextFiles("data/baseinput/ratings100/")

  • textFile在读取小文件的时候,会参考小文件的个数,文件个数越多,分区个数越多

  • sc.textFile遇到小文件没有办法很好合并小文件的,即便重写第二个参数也没有作用

  • 用textFile时,它的partition的数量是与文件夹下的文件数量(实例中用3个xxx.log文件)相关,一个文件就是一个partition(既然3个文件就是:partition=3)。

  • wholeTextFiles的partition数量是根据用户指定或者文件大小来(文件内的数据量少 有hdfs源码默认确定的)

  • 确定与hdfs目录下的文件数量无关!所以说:wholeTextFile通常用于读取许多小文件的需求。

查看RDD分区的shell命令

#从集合中创建
sc.parallelize(Seq(1,2,3,4))

#查看分区数量(并行数量)
res3.getNumPartitions

#查看分区并行数量的内容
#将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]

res3.glom.collect

#查看分区数量(并行数量)
res3.partitions.length

关于DRR分区决定因素

  • 第一点:RDD分区的原则是使得分区的个数尽量等于集群中的CPU核心(core)数目,这样可以充分利用CPU的计算资源;

  • 第二点:在实际中为了更加充分的压榨CPU的计算资源,会把并行度设置为cpu核数的2~3倍;

  • 第三点:RDD分区数和启动时指定的核数、调用方法时指定的分区数、如文件本身分区数有关

partitionBy 改变分区

解析:

  • 对RDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD.
scala> val rdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd")),4)
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[44] at parallelize at <console>:24

scala> rdd.partitions.size
res24: Int = 4

scala> var rdd2 = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[45] at partitionBy at <console>:26

scala> var rdd3 = rdd.partitionBy(new org.apache.spark.RangePartitioner(2,rdd))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[40] at partitionBy at <console>:25

scala> rdd2.partitions.size
res25: Int = 2
scala> rdd2.glom.collect
res26: Array[Array[(Int, String)]] = Array(Array((2,bbb), (4,ddd)), Array((1,aaa), (3,ccc)))
scala> rdd3.glom.collect
res27: Array[Array[(Int, String)]] = Array(Array((1,aaa), (2,bbb)), Array((3,ccc), (4,ddd)))

注意:Spark采用的分区有三种:

  • 水平分区,也就是sc.makerdd按照下标元素划分,

  • Hash划分根据数据确定性划分到某个分区,一般只给定分区数。

  • Range分区该方法一般按照元素大小进行划分不同区域,每个分区表示一个数据区域,如数组中每个数是[0,100]之间的随机数,Range划分首先将区域划分为10份,然后将数组中每个数字分发到不同的分区,比如将18分到(10,20]的分区,最后对每个分区进行排序。

RDD编程

在Spark中,RDD被表示为对象,通过对象上的方法调用来对RDD进行转换。经过一系列的transformations定义RDD之后,就可以调用actions触发RDD的计算,action可以是向应用程序返回结果(count, collect等),或者是向存储系统保存数据(saveAsTextFile等)。在Spark中,只有遇到action,才会执行RDD的计算(即延迟计算),这样在运行时可以通过管道的方式传输多个转换。

要使用Spark,开发者需要编写一个Driver程序,它被提交到集群以调度运行Worker,如下图所示。Driver中定义了一个或多个RDD,并调用RDD上的action,Worker则执行RDD分区计算任务。

RDD的转化 ( 重点掌握 )

RDD整体上分为 TRANSFORMATIONS 跟 ACTIONS 两种

Value类型

map(func) 重点

将RDD创建的集合转换为另外一个映射集合,例如,如果将一个Array中的数全部 *2 输出,那么就会用到map方法。例如

//创建一个array
scala> sc.makeRDD(1 to 10)
res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:25


//集合内每个元素*2
scala> res0.map(_*2)
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:27


//打印输出
scala> res1.collect()
res2: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)        
mapPartitions(func)

类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区。同样以上述的需求为例:

//创建一个array
scala> sc.makeRDD(1 to 10)
res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:25


//集合内每个元素*2
scala> res0.mapPartitions(x=>x.map(_*2))
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:27


//打印输出
scala> res1.collect()
res2: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)       
mapPartitionsWithIndex(func)

类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U];

glom

将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]

scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24
scala> rdd.glom().collect()
res25: Array[Array[Int]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8), Array(9, 10, 11, 12), Array(13, 14, 15, 16))
groupBy(func) 重点

分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。

scala> val rdd = sc.parallelize(1 to 4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24

scala> val group = rdd.groupBy(_%2)
group: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupBy at <console>:26

scala> group.collect
res0: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4)), (1,CompactBuffer(1, 3)))

上述例子解释是创建一个1到4的序列,然后把能被2整除的放进一个元祖中,不能被2整除的放入另外一个元祖中。那么分组的条件就是%2

filter(func) 重点

过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成。比如创建一个RDD(由字符串组成),过滤出一个新RDD(包含”xiao”子串)

scala> var sourceFilter = sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi"))
sourceFilter: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24

scala> sourceFilter.collect()
res9: Array[String] = Array(xiaoming, xiaojiang, xiaohe, dazhi)

scala> val filter = sourceFilter.filter(_.contains("xiao"))
filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at filter at <console>:26

scala> filter.collect()
res10: Array[String] = Array(xiaoming, xiaojiang, xiaohe)

sortBy(func,[ascending], [numTasks]) 重点

使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正序。

//创建一个RDD
scala> val rdd = sc.parallelize(List(2,1,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24

//按照自身大小排序
scala> rdd.sortBy(x => x).collect()
res11: Array[Int] = Array(1, 2, 3, 4)

//按照与3余数的大小排序
scala> rdd.sortBy(x => x%3).collect()
res12: Array[Int] = Array(3, 4, 1, 2)

Key-Value类型

partitionBy

pairRDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD,即会产生shuffle过程。

groupByKey

作用:groupByKey也是对每个key进行操作,但只生成一个sequence。

//创建一个pairRDD
scala> val words = Array("one", "two", "two", "three", "three", "three")
words: Array[String] = Array(one, two, two, three, three, three)

scala> val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
wordPairsRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:26

//将相同key对应值聚合到一个sequence中
scala> val group = wordPairsRDD.groupByKey()
group: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[5] at groupByKey at <console>:28

//打印结果
scala> group.collect()
res1: Array[(String, Iterable[Int])] = Array((two,CompactBuffer(1, 1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1)))

//计算相同key对应值的相加结果
scala> group.map(t => (t._1, t._2.sum))
res2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at map at <console>:31

//打印结果
scala> res2.collect()
res3: Array[(String, Int)] = Array((two,2), (one,1), (three,3))
reduceByKey(func, [numTasks])

在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。

//创建一个pairRDD
scala> val rdd = sc.parallelize(List(("female",1),("male",5),("female",5),("male",2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[46] at parallelize at <console>:24

//算相同key对应值的相加结果
scala> val reduce = rdd.reduceByKey((x,y) => x+y)
reduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[47] at reduceByKey at <console>:26

//打印结果
scala> reduce.collect()
res29: Array[(String, Int)] = Array((female,6), (male,7))

reduceByKey和groupByKey的区别

1.reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v].

2.groupByKey:按照key进行分组,直接进行shuffle。

aggregateByKey

在kv对的RDD中,,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。

(1)zeroValue:给每一个分区中的每一个key一个初始值;

(2)seqOp:函数用于在每一个分区中用初始值逐步迭代value;

(3)combOp:函数用于合并每个分区中的结果。

//创建一个pairRDD
scala> val rdd = sc.parallelize(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24

//取出每个分区相同key对应值的最大值,然后相加
scala> val agg = rdd.aggregateByKey(0)(math.max(_,_),_+_)
agg: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[1] at aggregateByKey at <console>:26

//打印结果
scala> agg.collect()
res0: Array[(String, Int)] = Array((b,3), (a,3), (c,12))
reduceByKey和groupByKey的区别
  1. reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v].
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
  1. groupByKey:按照key进行分组,直接进行shuffle。
val createCombiner = (v: V) => CompactBuffer(v) ,它把一个V变成一个C(例    如,创建一个单元素列表)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v ,将一个V合并到一个C中(例如,将它添加到列表的末尾)
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 ,将两个C合并成一个C。
  1. 开发指导:reduceByKey比groupByKey,建议使用。但是需要注意是否会影响业务逻辑。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,319评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,801评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,567评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,156评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,019评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,090评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,500评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,192评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,474评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,566评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,338评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,212评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,572评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,890评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,169评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,478评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,661评论 2 335

推荐阅读更多精彩内容