RDD mapPartitionsWithIndex 与mapPartitions

定义

Transformation Meaning
mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
mapPartitionsWithIndex(func) Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.

注意,func的返回值是Iterator<U>

  • mapPartitionsWithIndex
    通过下面发方法可以查看每个partition存储的内容是什么
//创建一个有3个partition的RDD
scala> val testRDD = sc.makeRDD(1 to 10, 3)
testRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at makeRDD at <console>:27

scala> testRDD.partitions.size
res1: Int = 3

scala> var newRDD = testRDD.mapPartitionsWithIndex {
     |   (index, partitionIterator) => {
     |     val partitionsMap = scala.collection.mutable.Map[Int, List[Int]]()
     |     var partitionList = List[Int]()
     | 
     |     while (partitionIterator.hasNext) {
     |       partitionList = partitionIterator.next() :: partitionList
     |     }
     | 
     |     partitionsMap(index) = partitionList
     |     partitionsMap.iterator//返回值
     |   }
     | }
newRDD: org.apache.spark.rdd.RDD[(Int, List[Int])] = MapPartitionsRDD[7] at mapPartitionsWithIndex at <console>:29

//每个RDD都存储了哪些元素
scala> newRDD.collect
res2: Array[(Int, List[Int])] = Array((0,List(3, 2, 1)), (1,List(6, 5, 4)), (2,List(10, 9, 8, 7)))
  • mapPartitions
    mapPartitions使用样例
scala> val newRDD = testRDD.mapPartitions { item => {
     |   var result = List[String]()
     | 
     |   while (item.hasNext) {
     |     result = (item.next() + 1).toString :: result
     |   }
     | 
     |   result = result ::: List("|")
     |   result.iterator//返回值
     | }
     | }
newRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[15] at mapPartitions at <console>:29

//每个partition的内容用|分隔,和上一个用例的结果一致
scala> newRDD.collect
res15: Array[String] = Array(4, 3, 2, |, 7, 6, 5, |, 11, 10, 9, 8, |)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,675评论 19 139
  • 背景 一年多以前我在知乎上答了有关LeetCode的问题, 分享了一些自己做题目的经验。 张土汪:刷leetcod...
    土汪阅读 14,353评论 0 33
  • Swift 介绍 简介 Swift 语言由苹果公司在 2014 年推出,用来撰写 OS X 和 iOS 应用程序 ...
    大L君阅读 8,554评论 3 25
  • 136.泛型 泛型代码让你可以写出灵活,可重用的函数和类型,它们可以使用任何类型,受你定义的需求的约束。你可以写出...
    无沣阅读 5,435评论 0 4
  • 近日,随着乐天集团同意其在星州郡的高尔夫球场用做美国在韩部署萨德反导系统的基地。原本就沸沸...
    清风烟雨笑阅读 8,796评论 0 11