[第十五章]Shuffle的读写源码剖析_4

上两节我们讲了普通shuffle的操作原理,与优化后的操作原理。并对比了他们各自的特别。那么我就了解到spark shuffle其实是进行了两步
第一步,ShuffleMapTask执行后把计算出来的数据写入ShuffleBlockFile里
第二步,ResultTask读取这些数据文件进行计算。
节章节就是深入剖析这两步的源码。

我们在前面讲过Executor在执行Task时,调用runTask方法,并返回MapStatus

 try {
      //获取shuffleManager,在用shuffleManager获取shuffleWriter对象
      val manager = SparkEnv.get.shuffleManager
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
      //最重要的代码在这里
      /**
       * 首先调用了rdd.iterator方法,参数是传入了当前 task要处理的partition
       * 所以核心的逻辑,就是rdd的iterator方法中,在这里,就实现了针对RDD的某个partitione,执行
       * 我们定义的算子或者函数
       * 当执行完我们自定义的算子或者函数,是不是相当于针对rdd的partiton执行了处理,那么是不是有返回值
       * ok,返回的数据,都是通过ShuffleWriter,经过HashPartitioner进行分区后,写入自己对应的bucket
       */
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
      /**
       * 最后,结果返回MapStatus
       * Mapstatus封装了ShuffleMapTask计算 后的数据,存储在哪里,这其实就是BlockManager相关的信息
       * BlockManager是spark底层的内存,数据,磁盘管理的组件
       * 讲完shuffle,我们会讲blockManager
       */
      return writer.stop(success = true).get
    } 

里面讲了
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
其实这个writer就是HashSuffleWriter.里write方法里,我们首先要判断是否需要在Map端本地聚会。是什么情况下可以聚合呢,这要看我们实际的业务,比如前面说的reduceBykey。

  //将每个ShuffleMapTask计算出来的rdd的partition数据,写入本地磁盘
  override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
    //首先要判断,是否需要在map端本地聚合
    //这里包括reduceByKey,这样的操作,dep.aggreatetor.isDefined是true
    //包括dep.mapSideCombine也是true
    val iter = if (dep.aggregator.isDefined) {
      if (dep.mapSideCombine) {
        //这里是执行本地聚会
        //比如本地:(hello,1)(hello,1) ==> (hello,2)
        dep.aggregator.get.combineValuesByKey(records, context)
      } else {
        records
      }
    } else {
      require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
      records
    }

那么这里说到了本地聚合,也可以叫组合(combine),有什么用呢,大家想一相,是不是这里要走网络传递了,要是本地聚合后,大大减少了网络流量 了
接着我们看源码:

//如果本地聚会就本地聚会,
    //然后遍历数据
    //对每个数据,调用partitioner,默认是HashPartitioner,生成bucketId
    //也就是决定 了,每一份数据写入哪个bucket中
    for (elem <- iter) {
      val bucketId = dep.partitioner.getPartition(elem._1)
     // 调用shuffleBlockManager.forMapTask来生成bucketId对应的Writer,然后把数据写入bucket
      shuffle.writers(bucketId).write(elem)
    }
  }

接着我们看forMapTask的方法。这个方法里就看到我们前面两节讲的普通shuffle与优化后的shuffle写入本地磁盘的区别。

ivate val shuffleState = shuffleStates(shuffleId)
      private var fileGroup: ShuffleFileGroup = null

      //这里很关键,前面我们讲过Shuffle有两种,一种是普通的,一种是优化后的
      //这里会判断,如果开启了consolidate,就是consoldateShuffleFile是true
      //这里不会为每个bucket都获取一个独立的文件
      //而是为这个bucket,获取一个ShuffleGroup的Writer
      val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
        fileGroup = getUnusedFileGroup()
        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
          //首先用shuffleId, mapId, bucketId来生成一个唯一的blockId
          //然后用bucket获取一个ShuffleGroup
          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
          //然后用BlockManager.getDiskWriter方法,针对ShuffleGroup获取一个Writer
          /**
           * 这样我们就清楚了,如果开启了consolidate机制
           * 实际上,对于每一个bucket,都会获取一个针对ShuffleFileGroup的writer
           * 而不是一个独立的ShuffleBlockFile的writer
           * 
           *  这样就实现了多个ShuffleMapTask的输出 数据的合并
           */
          blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize,
            writeMetrics)
        }} 

上面讲了当开启了consolidate机制后,对于每一个bucket都会获取一个ShuffleFileGroup的writer
而普通的shuffle的源码如下:

else {
        //如果没有开启consolicate机制,也就是普通的Shuffle
        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
          //同样生成一个blockId
          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
          //然后调用  blockManager.diskBlockManager,获取了一个代表要写入磁盘文件的blockFile
          val blockFile = blockManager.diskBlockManager.getFile(blockId)
          // Because of previous failures, the shuffle file may already exist on this machine.
          
          // If so, remove it.
          if (blockFile.exists) {
            if (blockFile.delete()) {
              logInfo(s"Removed existing shuffle file $blockFile")
            } else {
              logWarning(s"Failed to remove existing shuffle file $blockFile")
            }
          }
          //然后调用 这个方法,针对那个blockFile生成writer
          blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize, writeMetrics)
        
          /**
           *所以对于普通的Shuffle操作的话
           * 对于每个ShuffleMapTask输出的bucket,都会在本地获取一个单独的ShuffleBlockFIle文件
           */
        }

我们看到了不管是哪种的shuffle,最终调用blockMamager.getDiskWriter方法写数据到本地磁盘。这就是Shuffle的第一步,写文件数据操作。

接下来我们看第二步,
在前面分析Task是不是分析了task的计算 ,调用了compute方法,里面是不是通过getReader方法得到ShuffleMapReader,调用read方法

 override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
    SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
      .read()
      .asInstanceOf[Iterator[(K, C)]]
  }

override def read(): Iterator[Product2[K, C]] = {
    val ser = Serializer.getSerializer(dep.serializer)
    //这里和我们以前分析的图串起来了吧DAGScheduler的MapoutputTrackerMaster中获取自己想要的数据信息
    //然后底层用 blockMamger拉取自己的数据
    val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)

    val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
      if (dep.mapSideCombine) {
        new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))
      } else {
        new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))
      }
    } else {
      require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")

      // Convert the Product2s to pairs since this is what downstream RDDs currently expect
      iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))
    }

用blockManager拉取自己的数据,调用ftech方法:

 def fetch[T](
      shuffleId: Int,
      reduceId: Int,
      context: TaskContext,
      serializer: Serializer)
    : Iterator[T] =
  {
    logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))
    val blockManager = SparkEnv.get.blockManager

    val startTime = System.currentTimeMillis
    /**
     * 拿到mapOutputTracker的引用 ,然后调用getServerStatuses
     * suffleId表示这个stage的上一个stage的ID
     * reduceId是bucketId
     * 这两个参数可以限制找到当前resultTask获取所需要的那份数据
     * getServerStatuses这个方法一定会走网络通信的,因为要联系dirver的
     */
    val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)

    //ShuffleBlockFetcherIterator构造 以后,就直接根据拉取地睛位置信息,通过BlockMamager
    //去远程的ShuffleTask所以节点的blockManager去拉取数据
    val blockFetcherItr = new ShuffleBlockFetcherIterator(
      context,
      SparkEnv.get.blockManager.shuffleClient,
      blockManager,
      blocksByAddress,
      serializer,
      SparkEnv.get.conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024)
    val itr = blockFetcherItr.flatMap(unpackBlock)

    //对拉取的数据进行封装
    val completionIter = CompletionIterator[T, Iterator[T]](itr, {
      context.taskMetrics.updateShuffleReadMetrics()
    })

这就是ResultTask读取数据的过程。在spark中Shuffle是重点。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,047评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,807评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,501评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,839评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,951评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,117评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,188评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,929评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,372评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,679评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,837评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,536评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,168评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,886评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,129评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,665评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,739评论 2 351

推荐阅读更多精彩内容

  • 场景 数据倾斜解决方案与shuffle类性能调优 分析 数据倾斜 有的时候,我们可能会遇到大数据计算中一个最棘手的...
    过江小卒阅读 3,434评论 0 9
  • 本文基于spark源码2.11 1. 前言 shuffle是spark job中一个重要的阶段,发生在map和re...
    aaron1993阅读 11,698评论 1 12
  • 1 数据倾斜调优 1.1 调优概述 有的时候,我们可能会遇到大数据计算中一个最棘手的问题——数据倾斜,此时Spar...
    wisfern阅读 2,934评论 0 23
  • Spark 数据倾斜的解决办法 调优概述 转载:http://blog.csdn.net/lw_ghy/artic...
    raincoffee阅读 1,131评论 0 6
  • 下雨天 女孩打着粉色小伞 朝着梦境走去··· 雨后的世界 那么干净,那么清新 阳光透过指缝 徜徉于脸庞 投影了一地...
    曹小七阅读 289评论 0 4