Spark系列——关于 mapPartitions的误区

前言

今天 Review 了一下同事的代码,
发现其代码中有非常多的 mapPartitions,
问其原因,他说性能比 map 更好。
我说为什么性能好呢?
于是就有了这篇文章

网上推崇 mapPartitions 的原因

按照某些文章的原话来说
一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。
又比如说:
如果是普通的map,比如一个partition中有1万条数据;
那么你的function要执行和计算1万次。
但是,使用MapPartitions操作之后,
一个task仅仅会执行一次function,
function一次接收所有的partition数据。
只要执行一次就可以了,性能比较高
这种说法如果按照上面的方式来理解其实也是那么一回事,
但是也很容易让一些新人理解为:
map要执行1万次,而 MapPartitions 只需要一次,这速度杠杠的提升了啊
实际上,你使用MapPartitions迭代的时候,
还是是一条条数据处理的,这个次数其实完全没变。

其实这个问题我们可以来看看源码
map算子源码

  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

接受用户传入的一个函数,
new 一个 MapPartitionsRDD 对象,
我们的函数是作用在 MapPartitionsRDD 的迭代器 iter 上。

mapPartition算子源码

  def mapPartitions[U: ClassTag](
      f: Iterator[T] => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U] = withScope {
    val cleanedF = sc.clean(f)
    new MapPartitionsRDD(
      this,
      (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
      preservesPartitioning)
  }

接受一个迭代器,
new 一个 MapPartitionsRDD 对象,
传入的迭代器是作为 MapPartitionsRDD 的迭代器。

说白了,这个两算子真没什么差,
map 算子可以理解为 mapPartitions 的一个高级封装而已。

mapPartitions 带来的问题

其实就我个人经验来看,
mapPartitions 的正确使用其实并不会造成什么大的问题,
当然我也没看出普通场景 mapPartitions 比 map 有什么优势,
所以 完全没必要刻意使用 mapPartitions
反而,mapPartitions 会带来一些问题。

  1. 使用起来并不是很方便,这个写过代码的人应该都知道。
    当然这个问题并不是不能解决,我们可以写类似下面的代码,
    确实也变的和 map 简洁性也差不太多,
    恩,我不会告诉你可以尝试在生产环境中用用噢。
    //抽象出一个函数,以后所有的 mapPartitions 都可以用
    def mapFunc[T, U](iterator: Iterator[T], f: T => U) = {
      iterator.map(x => {
        f(x)
      })
    }
    //使用    
    rdd.mapPartitions(x => {
        mapFunc(x, line => {
            s"${line}转换数据"
        })
      })
    
    
  2. 容易造成 OOM,这个也是很多博客提到的问题,
    他们大致会写出如下的代码来做测试,
    rdd.mapPartitions(x => {
        xxxx操作
       while (x.hasNext){
         val next = x.next()
       }
        xxx操作
      })
    
    如果你的代码是上面那样,那OOM也就不足为奇了,
    不知道你注意到了没有,mapPartitions 是接受一个迭代器,
    再返回一个迭代器的,
    如果你这么写代码,就完全没有使用到迭代器的懒执行特性。
    将数据都堆积到了内存,
    真就变成了一次处理一个partition的数据了,
    在某种程度上已经破坏了 Spark Pipeline 的计算模式了。

mapPartitions 到底该怎么用

一对一的普通使用

存在即是道理,
虽然上面一直在吐槽,
但是其确实有存在的理由。
其一个分区只会被调用一次的特性,
在一些写数据库的时候确实很有帮助,
因为我们的 Spark 是分布式执行的,
所以连接数据库的操作必须放到算子内部才能正确的被Executor执行,
那么 mapPartitions 就显示比 map 要有优势的多了。
比如下面这段伪代码

rdd.mapPartitions(x => {
        println("连接数据库")
        val res = x.map(line=>{
          print("写入数据:" + line)
          line
        })
        res
      })

这样我就一个分区只要连接一次数据库,
而如果是 map 算子,那可能就要连接 n 多次了。
不过上面这种就没法关闭数据库连接了,
所以可以换另外一种方式:

rdd1.mapPartitions(x => {
      println("连接数据库")
      new Iterator[Any] {
        override def hasNext: Boolean = {
          if (x.hasNext) {
            true
          } else {
            println("关闭数据库")
            false
          }
        }
        override def next(): Any = "写入数据:" + x.next()
      }
    })

自定义一个迭代器,
这样虽然麻烦了一点,
但是无疑才是正确的。
当然还有一些复杂的处理,
比如类似 flatMap那种要输出多条怎么办?
这个时候可以去参考下 Iterator 的源码是怎么实现的,
同样不难,这里就不赘述了。

一对多的的高级使用

本来是想偷点懒的,
不过既然有人问起这个,
这里就补充说下输出多条的方式。

思路其实很简单,
我们可以查看迭代器的源码,
他是有一个 flatMap 算子的,
我们仿照一下就ok啦。
下面我们来解读下 Iterator.flatMap算子这段的源码吧。

        // f 函数是 传入每一条数据都需要返回一个迭代器
        // 也就是说一条记录可以返回多个值
        def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new AbstractIterator[B] {
          //定义当前的迭代器是空的
          private var cur: Iterator[B] = empty
        //这是源码,为了方便理解,我稍微改写了下
//          def hasNext: Boolean =
//            cur.hasNext || self.hasNext && {
//              cur = f(self.next).toIterator;
//              hasNext
//           }
        def hasNext: Boolean ={
          if(cur.hasNext){
            //如果当前迭代器还有值,
            //则返回true
            return true
          }
          if(self.hasNext){
            //如果cur已经没有值了
            //但是本身的迭代器还有值
            //则我们把本身迭代器的一个值拿出来
            //通过 f函数 构造一个迭代器放到当前的迭代器
            cur = f(self.next).toIterator;
            //再递归一次本函数来看是否还有值
            return hasNext
          }
        }
          //这个就没什么好说的了
          def next(): B = (if (hasNext) cur else empty).next()
        }

上面的代码为了方便理解,
我修改了下,并加了注释,
应该是很好理解了。

这里如果你如果要做伸手党的话,
我也给出一个实例代码

 val conf = new SparkConf()
      .setMaster("local[1]")
      .setAppName("test")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    sc.parallelize(Seq("a,a,a,a,a"))
      .mapPartitions(iter => {

        new AbstractIterator[String] {
          def myF(data: String): Iterable[String] = {
            println(data)
            data.split(",").toIterable
          }

          var cur: Iterator[String] = Iterator.empty

          override def hasNext: Boolean = {
            cur.hasNext || iter.hasNext && {
              cur = myF(iter.next).toIterator
              hasNext
            }
          }

          override def next(): String = (if (hasNext) cur else Iterator.empty).next()
        }
      })
      .foreach(println)

这里捎带提一下就是,
其实迭代器本身就有 Map flatMap 等算子,
之所以还要去自定义,
就是因为自定义提供了更加自由的一些操作,
比如开启和关闭数据库等,
但是大部分情况下,
还是能不自定义,谁想去折腾呢?

其他

另外一点就是 mapPartitions 提供给了我们更加强大的数据控制力,
怎么理解呢?我们可以一次拿到一个分区的数据,
那么我们就可以对一个分区的数据进行统一处理,
虽然会加大内存的开销,但是在某些场景下还是很有用的,
比如一些矩阵的乘法。

后记

不管你要使用哪个算子,其实都是可以的,
但是大多数时候,我还是推荐你使用 map 算子,
当然遇到一些map算子不合适的场景,
那就没办法了...
不过就算你是真的要使用 mapPartitions,
那么请记得充分发挥一下 迭代器的 懒执行特性。

最后,如果本文对你有帮助,帮忙点个赞呗

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容