Spark之RDD算子-转换算子

RDD-Transformation

转换(Transformation)算子就是对RDD进行操作的接口函数,其作用是将一个或多个RDD变换成新的RDD。

使用Spark进行数据计算,在利用创建算子生成RDD后,数据处理的算法设计和程序编写的最关键部分,就是利用变换算子对原始数据产生的RDD进行一步一步的变换,最终得到期望的计算结果。

对于变换算子可理解为分两类:1,对Value型RDD进行变换的算子;2,对Key/Value型RDD进行变换算子。在每个变换中有仅对一个RDD进行变换的,也有是对两个RDD进行变换的。

对单个Value型的RDD进行变换

  • map
  • filter
  • distinct
  • flatMap
  • sample
  • union
  • intersection
  • groupByKey
    对于上面列出的几个RDD转换算子因为在前面的文章里有介绍了,这里就不进行示例展示了。详见
coalesce——重新分区

将当前RDD进行重新分区,生成一个以numPartitions参数指定的分区数存储的新RDD。参数shuffle为true时在变换过程中进行shuffle操作,否则不进行shuffle。

def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T]

Note:
With shuffle = true, you can actually coalesce to a larger number of partitions. This is useful if you have a small number of partitions, say 100, potentially with a few partitions being abnormally large. Calling coalesce(1000, shuffle = true) will result in 1000 partitions with the data distributed using a hash partitioner. The optional partition coalescer passed in must be serializable.

scala> val rdd = sc.parallelize(List(1,2,3,4,5,6,7,8), 4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24

scala> rdd.partitions
res13: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ParallelCollectionPartition@7b0, org.apache.spark.rdd.ParallelCollectionPartition@7b1, org.apache.spark.rdd.ParallelCollectionPartition@7b2, org.apache.spark.rdd.ParallelCollectionPartition@7b3)

scala> rdd.partitions.length
res14: Int = 4

scala> rdd.collect
res15: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8)

scala> rdd.glom.collect
res16: Array[Array[Int]] = Array(Array(1, 2), Array(3, 4), Array(5, 6), Array(7, 8))

scala> val newRDD = rdd.coalesce(2, false)
newRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[9] at coalesce at <console>:26

scala> newRDD.partitions.length
res17: Int = 2

scala> newRDD.collect
res18: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8)

scala> newRDD.glom.collect
res19: Array[Array[Int]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8))
coalesce-test
coalesce
pipe——调用Shell命令
# Return an RDD created by piping elements to a forked external process.
def pipe(command: String): RDD[String]

在Linux系统中,有许多对数据进行处理的shell命令,我们可能通过pipe变换将一些shell命令用于Spark中生成新的RDD。

scala> val rdd = sc.parallelize(0 to 7, 4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:24

scala> rdd.glom.collect
res20: Array[Array[Int]] = Array(Array(0, 1), Array(2, 3), Array(4, 5), Array(6, 7))

scala> rdd.pipe("head -n 1").collect #提取每一个分区中的第一个元素构成新的RDD
res21: Array[String] = Array(0, 2, 4, 6)
pipe-RDD
pipe

sortBy——排序

对原RDD中的元素按照函数f指定的规则进行排序,并可通过ascending参数进行升序或降序设置,排序后的结果生成新的RDD,新的RDD的分区数量可以由参数numPartitions指定,默认与原RDD相同的分区数。

# Return this RDD sorted by the given key function.
def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
sortBy
scala> val rdd = sc.parallelize(List(2,1,4,3),1)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at <console>:24

scala> rdd.glom.collect
res24: Array[Array[Int]] = Array(Array(2, 1, 4, 3))

scala> rdd.sortBy(x=>x, true).collect
res25: Array[Int] = Array(1, 2, 3, 4)

scala> rdd.sortBy(x=>x, false).collect
res26: Array[Int] = Array(4, 3, 2, 1)

对两个Value型RDD进行变换

cartesian——笛卡尔积

输入参数为另一个RDD,返回两个RDD中所有元素的笛卡尔积。

# Return the Cartesian product of this RDD and another one, 
# that is, the RDD of all pairs of elements (a, b) where a is in this and b is in other.
def cartesian[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]
cartesian-RDD
scala> val rdd1 = sc.parallelize(List("a", "b", "c"),1)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[27] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(List(1,2,3), 1)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at parallelize at <console>:24

scala> rdd1.cartesian(rdd2).collect
res27: Array[(String, Int)] = Array((a,1), (a,2), (a,3), (b,1), (b,2), (b,3), (c,1), (c,2), (c,3))

subtract——补集

输入参数为另一个RDD,返回原始RDD与输入参数RDD的补集,即生成由原始RDD中而不在输入参数RDD中的元素构成新的RDD,参数numPartitions指定新RDD分区数。

#Return an RDD with the elements from this that are not in other.
defsubtract(other: RDD[T], numPartitions: Int): RDD[T]
subtract-RDD
scala> val rdd1 = sc.parallelize(0 to 5, 1)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[30] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(0 to 2,1)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at parallelize at <console>:24

scala> rdd1.subtract(rdd2).collect
res28: Array[Int] = Array(3, 4, 5)
subtract

union——并集

返回原始RDD与另一个RDD的并集。

#  Return the union of this RDD and another one.
def union(other: RDD[T]): RDD[T]

def ++(other: RDD[T]): RDD[T]
#Return the union of this RDD and another one.
union-RDD

zip——联结

生成由原始RDD的值为Key,另一个RDD的值为Value依次配对构成的所有Key/Value对,并返回这些Key/Value对集合构成的新RDD


Paste_Image.png

对Key/Value型RDD进行变换

对单个Key-Value型RDD进行变换

combineByKey——按Key聚合

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
combineByKey-RDD
scala> val pair = sc.parallelize(List(("fruit", "Apple"), ("fruit", "Banana"), ("vegetable", "Cucumber"), ("fruit", "Cherry"), ("vegetable", "Bean"), ("vegetable", "Pepper")),2)
pair: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[41] at parallelize at <console>:24

scala> val combinePair = pair.combineByKey(List(_), (x:List[String], y:String) => y::x, (x:List[String], y:List[String]) => x:::y)
combinePair: org.apache.spark.rdd.RDD[(String, List[String])] = ShuffledRDD[42] at combineByKey at <console>:26

scala> combinePair.collect
res31: Array[(String, List[String])] = Array((fruit,List(Banana, Apple, Cherry)), (vegetable,List(Cucumber, Pepper, Bean)))

flatMapValues——对所有Value进行flatMap

# Pass each value in the key-value pair RDD through a flatMap function without changing the keys;
# this also retains the original RDD's partitioning.
def flatMapValues[U](f: (V) =>TraversableOnce[U]): RDD[(K, U)]
flatMapValue-RDD
scala> val rdd = sc.parallelize(List("a", "boy"), 1).keyBy(_.length)
rdd: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[44] at keyBy at <console>:24

scala> rdd.collect
res32: Array[(Int, String)] = Array((1,a), (3,boy))

scala> rdd.flatMapValues(x=>"*" + x + "*").collect
res33: Array[(Int, Char)] = Array((1,*), (1,a), (1,*), (3,*), (3,b), (3,o), (3,y), (3,*))

keys——提取Key

将Key/Value型RDD中的元素的Key提取出来,所有Key值构成一个序列形成新的RDD。

# Return an RDD with the keys of each tuple.
def keys: RDD[K]
keys-RDD
scala> val pairs = sc.parallelize(List("wills", "aprilchang","kris"),1).keyBy(_.length) 
pairs: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[47] at keyBy at <console>:24

scala> pairs.collect
res34: Array[(Int, String)] = Array((5,wills), (10,aprilchang), (4,kris))

scala> pairs.keys.collect
res35: Array[Int] = Array(5, 10, 4)

mapValues——对Value值进行变换

将Key/Value型RDD中的元素的Value值使用输入参数函数f进行变换构成一个新的RDD。

# Pass each value in the key-value pair RDD through a map function without changing the keys; 
# this also retains the original RDD's partitioning.
def mapValues[U](f: (V) => U): RDD[(K, U)]
mapValues-RDD

partitionBy——按Key值重新分区

def partitionBy(partitioner: Partitioner): RDD[(K, V)]
#Return a copy of the RDD partitioned using the specified partitioner.
partitionBy-RDD
scala> val pairs = sc.parallelize(0 to 9, 2).keyBy(x=>x)
pairs: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[51] at keyBy at <console>:24

scala> pairs.collect
res37: Array[(Int, Int)] = Array((0,0), (1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8), (9,9))

scala> import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner

scala> val partitionPairs = pairs.partitionBy(new HashPartitioner(2)) #按每个key的Hash值进行分区的
partitionPairs: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[52] at partitionBy at <console>:27

scala> partitionPairs.glom.collect
res38: Array[Array[(Int, Int)]] = Array(Array((0,0), (2,2), (4,4), (6,6), (8,8)), Array((1,1), (3,3), (5,5), (7,7), (9,9)))

reduceByKey——按Key值进行Reduce操作

def reduceByKey(func: (V, V) => V): RDD[(K, V)]
## Merge the values for each key using an associative and commutative reduce function.

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
## Merge the values for each key using an associative and commutative reduce function. 
## This will also perform the merging locally on each mapper before sending results to a reducer, 
## similarly to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
## Merge the values for each key using an associative and commutative reduce function.

sortByKey——按Key值排序

values——取得value值构成新的RDD

对两个Key-Value型RDD进行变换

cogroup——按Key值聚合

join——按Key值联结

leftOuterJoin——按Key值进行左外联结

rightOuterJoin——按Key值进行右外联结

subtractByKey——按Key值求补

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,928评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,192评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,468评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,186评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,295评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,374评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,403评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,186评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,610评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,906评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,075评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,755评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,393评论 3 320
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,079评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,313评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,934评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,963评论 2 351

推荐阅读更多精彩内容