44、Spark内核源码深度剖析之Shuffle原理剖析与源码分析

在Spark中,什么情况下,会发生shuffle?reduceByKey、groupByKey、sortByKey、countByKey、join、cogroup等操作

Spark Shuffle操作的两个特点

第一个特点
在Spark早期版本中,那个bucket缓存是非常非常重要的,因为需要将一个ShuffleMapTask所有的数据都写入内存缓存之后,才会刷新到磁盘。但是这就有一个问题,如果map side数据过多,那么很容易造成内存溢出。所以spark在新版本中,优化了,默认那个内存缓存是100kb,然后呢,写入一点数据达到了刷新到磁盘的阈值之后,就会将数据一点一点地刷新到磁盘。
这种操作的优点,是不容易发生内存溢出。缺点在于,如果内存缓存过小的话,那么可能发生过多的磁盘写io操作。所以,这里的内存缓存大小,是可以根据实际的业务情况进行优化的。
第二个特点,
与MapReduce完全不一样的是,MapReduce它必须将所有的数据都写入本地磁盘文件以后,才能启动reduce操作,来拉取数据。为什么?因为mapreduce要实现默认的根据key的排序!所以要排序,肯定得写完所有数据,才能排序,然后reduce来拉取。
但是Spark不需要,spark默认情况下,是不会对数据进行排序的。因此ShuffleMapTask每写入一点数据,ResultTask就可以拉取一点数据,然后在本地执行我们定义的聚合函数和算子,进行计算。
spark这种机制的好处在于,速度比mapreduce快多了。但是也有一个问题,mapreduce提供的reduce,是可以处理每个key对应的value上的,很方便。但是spark中,由于这种实时拉取的机制,因此提供不了,直接处理key对应的values的算子,只能通过groupByKey,先shuffle,有一个MapPartitionsRDD,然后用map算子,来处理每个key对应的values。就没有mapreduce的计算模型那么方便。

普通的Shuffle操作原理剖析

普通的Shuffle操作原理剖析.png

每个ShuffleMapTask,都会为每个ResultTask创建一份bucket缓存,以及对应的ShuffleBlockFile磁盘文件
所以假设,如果有100个ShuffleMapTask,100个ResultTask,本地磁盘要产生10000个文件,磁盘IO过多,影响性能
ShuffleMapTask的输出,会作为MapStatus,发送到DAGScheduler的MapOutputTrackerMaster中,MapStatus包含了每个ResultTask要拉取的数据大大小
每个ResultTask会用BlockStoreShuffleFetcher去MapOutputTrackerMaster获取自己的要拉取的文件的信息,然后底层通过BlockManager将数据拉取过来
Map端的数据,可以理解为Shuffle的第一个RDD,MapPartitionsRDD
每个ResultTask拉取过来的数据,其实会组成一个内部的RDD,叫ShuffledRDD,优先放入内存,如果内存不够,那么写入磁盘
然后每个ResultTask针对数据进行聚合,最后生成MapPartitionsRDD,就是我们执行reduceByKey等操作希望获得的那个RDD

优化后的Shuffle原理剖析

优化后的Shuffle原理剖析.png

Spark新版本中,引入了consolidation机制,也就说,提出了ShuffleGroup概念
一个ShuffleMapTask将数据写入ResultTask数量的本地文件,这个不会变,但是,当下一个ShuffleMapTask运行的时候,可以直接将数据写入之前的ShuffleMapTask的本地文件,相当于是,对多个ShuffleMapTask的输出进行了合并,从而大大减少了本地磁盘的数量
一组ShuffleGroup,每个文件中,都存储了多个ShuffleMapTask的数据,每个ShuffleMapTask的数据,叫做一个segment。此外,会通过一些索引,来标记每个ShuffleMapTask的输出在ShuffleBlockFile中的索引,以及偏移量等,来进行不同ShuffleMapTask的数据的区分
机器上,有两个cpu,也就说,4个ShuffleMapTask,有2个ShuffleMapTask是可以并行执行的,先执行的是黑色边框的两个ShuffleMapTask,再执行红色边框的两个ShuffleMapTask
并行执行的ShuffleMapTask,写入的文件,一定是不同的,当一批并行执行的ShuffleMapTask运行完之后,那么新的一批ShuffleMapTask启动起来并行执行的时候,优化机制就开始发挥作用了,开启优化Shuffle操作,主要是通过在SparkConf中,设置一个参数就可以
每一个节点上面,比如有2个cpu,100个ShuffleMapTask,那么此时,就会产生100*100个磁盘文件,就是10000个
开启了consolidation机制之后的Shuffle write
每个节点上的磁盘文件,数量变成了cpu core的数量 * ShuffleMapTask的数量,同样的情况,每个节点只产生200个磁盘文件

源码

Shuffle写

入口

// 这个writer默认是HashShuffleWriter
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])

看HashShuffleWriter的writer方法

 // 将每个ShuffleMapTask计算出来的新的rdd的partition数据,写入本地磁盘
  override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
    // 首先判断,是否需要再map端本地进行聚合,这里的话,如果是reduceByKey这种操作,它的dep.aggregator.isDefined、dep.mapSideCombine都是true
    // 那么就会进行map端的本地聚合
    val iter = if (dep.aggregator.isDefined) {
      if (dep.mapSideCombine) {
        // 这里就会执行本地聚合,比如本地有(hello,1)、(hello,1),那么此时就会聚合成(hello,2)
        dep.aggregator.get.combineValuesByKey(records, context)
      } else {
        records
      }
    } else {
      require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
      records
    }

    // 需要本地聚合,那么先本地聚合,然后遍历数据,对每个数据,调用partitioner,默认是HashPartitioner
    // 生成bucketId,也就是决定了,每一份数据,要写入哪个bucket中
    for (elem <- iter) {
      val bucketId = dep.partitioner.getPartition(elem._1)
      // 获取到了bucketId之后,会调用shuffleBlockManager.forMapTask()方法,来生成bucketId对应的writer,然后用writer将数据写入bucket
      shuffle.writers(bucketId).write(elem)
    }
  }

上面代码的shuffle其实是

private val shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser,
    writeMetrics)

看看forMapTask()方法

// 给每一个mao task 获取一个ShuffleWriterGroup
  def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer,
      writeMetrics: ShuffleWriteMetrics) = {
    new ShuffleWriterGroup {
      shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
      private val shuffleState = shuffleStates(shuffleId)
      private var fileGroup: ShuffleFileGroup = null

      // 对应之前讲解的,shuffle有两种模式,一种是普通的,一种是优化后的
      // 这里会判断,如果开启了consolidation机制,也就是consolidateShuffleFiles为true的话,那么实际上,不会给每个bucket都获取一个独立的文件
      // 而是为这个bucket获取一个ShuffleGroup的writer
      val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
        fileGroup = getUnusedFileGroup()
        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
          // 首先,用shuffleId、mapId、bucketId(也就是reduceId,一个bucket对应一个reduce)生成一个唯一的ShuffleBlockId
          // 然后用buckId,来调用ShuffleFileGroup的apply()函数,为bucket获取一个ShuffleFileGroup
          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
          // 然后调用BlockManager的getDiskWriter()方法,针对ShuffleFileGroup获取一个writer
          // 这里,就明白了,如果开启了consolidation机制,实际上,对于每一个bucket,都会获取一个针对ShuffleFileGroup的wtriter,而不是一个独立的ShuffleBlockFile的writer
          // 这样就实现了所谓的,多个ShuffleMapTask的输出数据的合并
          blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize,
            writeMetrics)
        }
      } else {
        // 如果没有开启consolidation机制,也就是普通的shuffle操作的话
        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
          // 同样生成一个ShuffleBlockId
          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
          // 然后调用BlockManager的diskBlockManager,获取一个代表了要写入本地磁盘文件的blockFile
          val blockFile = blockManager.diskBlockManager.getFile(blockId)
          // Because of previous failures, the shuffle file may already exist on this machine.
          // If so, remove it.
          // 而且会判断,如果blockFile要是存在的话,还得删除它
          if (blockFile.exists) {
            if (blockFile.delete()) {
              logInfo(s"Removed existing shuffle file $blockFile")
            } else {
              logWarning(s"Failed to remove existing shuffle file $blockFile")
            }
          }
          // 然后调用BlockManager的getDiskWriter()方法,针对那个blockFile生成writer
          blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize, writeMetrics)
        }
        // 所以使用这种普通的shuffle操作的话,对于每一个ShuffleMapTask输出的bucket,都会在本地获取一个单独的ShuffleBlockFile
      }

shuffle读

入口

  override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
    // ResultTask或者ShuffleMapTask在执行到ShuffleRDD时,肯定会调用ShuffleRDD的computer()方法,来计算当前这个RDD的partition的数据
    // 在这里,会调用ShuffleManager的getReader()方法,获取一个HashShuffleReader,然后调用它的read()方法,拉取该ResultTask / ShuffleMapTask需要聚合的数据
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
    SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
      .read()
      .asInstanceOf[Iterator[(K, C)]]
  }

看看HashShuffleReader的read()方法

  /** Read the combined key-values for this reduce task */
  override def read(): Iterator[Product2[K, C]] = {
    val ser = Serializer.getSerializer(dep.serializer)
    // reduceTask在拉取数据时,其实会用BlockStoreShuffleFetcher来从DAGDcheduler的MapOutputTrackerMaster中获取自己想要的数据的信息
    // 然后底层,再通过blockManager从对应的位置,拉取需要的数据
    val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)

    val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
      if (dep.mapSideCombine) {
        new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))
      } else {
        new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))
      }
    } else {
      require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")

      // Convert the Product2s to pairs since this is what downstream RDDs currently expect
      iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))
    }

    // Sort the output if there is a sort ordering defined.
    dep.keyOrdering match {
      case Some(keyOrd: Ordering[K]) =>
        // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
        // the ExternalSorter won't spill to disk.
        val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
        sorter.insertAll(aggregatedIter)
        context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled)
        context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled)
        sorter.iterator
      case None =>
        aggregatedIter
    }
  }

看看BlockStoreShuffleFetcher.fetch()方法

private[hash] object BlockStoreShuffleFetcher extends Logging {
  def fetch[T](
      shuffleId: Int,
      reduceId: Int,
      context: TaskContext,
      serializer: Serializer)
    : Iterator[T] =
  {
    logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))
    // 拿到全局的blockManager
    val blockManager = SparkEnv.get.blockManager

    val startTime = System.currentTimeMillis
    // 拿到一个全局的MapOutputTracker的引用,调用其getServerStatuses()方法,传入了shuffleId和reduceId
    // shuffleId可以代表当前这个stage的上一个stage,shuffle是分为两个stage的,shuffle writer发生在上一个stage中,shuffle read 是发生在当前stage中的
    // 首先通过shuffleId可以限制到上一个stage的所有ShuffleMapTask的输出的MapStatus,接着,通过reduceId,也就是所谓的bucketId,来限制,从每个mapTask中,获取当前这个resultTask需要获取的每个ShuffleMapTask的输出文件的信息
    // getServerStatuses()方法,一定是走远程网络通信的,因为要联系Driver上的DAGScheduler的MapOutputTrackerMaster
    val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)
    logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format(
      shuffleId, reduceId, System.currentTimeMillis - startTime))

    // 对刚才拉取的数据,进行一些数据结构上的转换操作
    val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]]
    for (((address, size), index) <- statuses.zipWithIndex) {
      splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size))
    }

    val blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = splitsByAddress.toSeq.map {
      case (address, splits) =>
        (address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))
    }

    def unpackBlock(blockPair: (BlockId, Try[Iterator[Any]])) : Iterator[T] = {
      val blockId = blockPair._1
      val blockOption = blockPair._2
      blockOption match {
        case Success(block) => {
          block.asInstanceOf[Iterator[T]]
        }
        case Failure(e) => {
          blockId match {
            case ShuffleBlockId(shufId, mapId, _) =>
              val address = statuses(mapId.toInt)._1
              throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, e)
            case _ =>
              throw new SparkException(
                "Failed to get block " + blockId + ", which is not a shuffle block", e)
          }
        }
      }
    }
    // ShuffleBlockFetcherIterator构造以后,在其内部,就直接根据拉取到的地理位置信息,通过BlockManager去远程ShuffleMapTask所在的节点的BlockManager去拉取数据
    val blockFetcherItr = new ShuffleBlockFetcherIterator(
      context,
      SparkEnv.get.blockManager.shuffleClient,
      blockManager,
      blocksByAddress,
      serializer,
      SparkEnv.get.conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024)
    val itr = blockFetcherItr.flatMap(unpackBlock)

    // 最后,将拉取到的数据,执行一些转换和封装,返回
    val completionIter = CompletionIterator[T, Iterator[T]](itr, {
      context.taskMetrics.updateShuffleReadMetrics()
    })

    new InterruptibleIterator[T](context, completionIter) {
      val readMetrics = context.taskMetrics.createShuffleReadMetricsForDependency()
      override def next(): T = {
        readMetrics.incRecordsRead(1)
        delegate.next()
      }
    }
  }
}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,254评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,875评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,682评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,896评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,015评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,152评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,208评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,962评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,388评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,700评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,867评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,551评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,186评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,901评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,689评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,757评论 2 351

推荐阅读更多精彩内容