Spark Shuffle Write 和Read

本文基于spark源码2.11

1. 前言

shuffle是spark job中一个重要的阶段,发生在map和reduce之间,涉及到map到reduce之间的数据的移动,以下面一段wordCount为例:

def main(args:Array[String]){
    val sparkConf = new SparkConf().setAppName("Log Query")
    val sc = new SparkContext(sparkConf)
    val lines = sc.textFile("README.md",3)
    val words = lines.flatMap(line => line.split(" "))
    val wordOne = words.map(word => (word,1))
    val wordCount = wordOne.reduceByKey(_ + _,3)
    wordCount.foreach(println)
}

其RDD的转换如下:

wordCount-shuffle.png

上图中map和flatMap这种转换只会产生rdd之间的窄依赖,因此对一个分区上进行map和flatMap可以如同流水线一样只在同一台的机器上尽心,不存在多个节点之间的数据移动,而reduceByKey这样的操作,涉及到需要将相同的key做聚合操作。上图中Stage1中按key做hash 到三个分区做reduce操作,对于Stage1中任意一个partition而言,其输入可能存在与上游Stage0中每一个分区中,因此需要从上游的每一个partition所在的机器上拉取数据,这个过程称为shuffle。

解释一下: spark的stage划分就是以shuffle依赖为界限划分的,上图中只存在一次shuffle操作,所以被划分为两个stage

从上图中可以看出shuffle首先涉及到stage0最后一个阶段需要写出map结果, 以及stage1从上游stage0中每一个partition写出的数据中读取属于当前partition的数据。

2. Shuffle Write

spark中rdd由多个partition组成,任务运行作用于partition。spark有两种类型的task:

  1. ShuffleMapTask, 负责rdd之间的transform,map输出也就是shuffle write
  2. ResultTask, job最后阶段运行的任务,也就是action(上面代码中foreach就是一个action,一个action会触发生成一个job并提交)操作触发生成的task,用来收集job运行的结果并返回结果到driver端。

“关于job的创建,stage的划分以及task的提交在另一篇文章中介绍(待填坑)”

shuffle write的操作发生在ShuffleMapTask#runTask中,其代码如下:

override def runTask(context: TaskContext): MapStatus = {
    // Deserialize the RDD using the broadcast variable.
    val threadMXBean = ManagementFactory.getThreadMXBean
    val deserializeStartTime = System.currentTimeMillis()
    val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime
    } else 0L
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
    _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
    } else 0L

    var writer: ShuffleWriter[Any, Any] = null
    try {
      val manager = SparkEnv.get.shuffleManager
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
      writer.stop(success = true).get
    } catch {
      case e: Exception =>
        try {
          if (writer != null) {
            writer.stop(success = false)
          }
        } catch {
          case e: Exception =>
            log.debug("Could not stop writer", e)
        }
        throw e
    }
  }

调用val (rdd, dep) = ser.deserialize(...)获取任务运行的rdd和shuffle dep,这是在由DAGScheduler序列化然后提交到当前任务运行的executor上的。

调用writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) 获得shuffle writer,调用writer.write(rdd.iterator)写出map output。idd.iterator在迭代过程中,会往上游一直追溯当前rdd依赖的rdd,然后从上至下调用rdd.compute()完成数据计算并返回iterator迭代转换计算的结果。 此处manager在SparkEnv中实例化微SortShuffleManager,下面是SortShuffleManager#getWriter方法:

 override def getWriter[K, V](
      handle: ShuffleHandle,
      mapId: Int,
      context: TaskContext): ShuffleWriter[K, V] = {
    numMapsForShuffle.putIfAbsent(
      handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
    val env = SparkEnv.get
    handle match {
      case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
        new UnsafeShuffleWriter(
          env.blockManager,
          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
          context.taskMemoryManager(),
          unsafeShuffleHandle,
          mapId,
          context,
          env.conf)
      case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
        new BypassMergeSortShuffleWriter(
          env.blockManager,
          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
          bypassMergeSortHandle,
          mapId,
          context,
          env.conf)
      case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
        new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
    }
  }

”上面提到shuffleManager被实例化为SortShuffleManager,老版本里还有HashShuffleManager,似乎不用了,这里有一篇两种方式的性能比较文章SortShuffleManager和HashShuffleManager性能比较

有三种类型的ShuffleWriter,取决于handle的类型。

  1. UnsafeShuflleWriter, 不清楚
  2. BypassMergeSortShuffleWriter, 这个writer会根据reduce的个数n(reduceByKey中指定的参数,有partitioner决定)创建n个临时文件,然后计算iterator每一个key的hash,放到对应的临时文件中,最后合并这些临时文件成一个文件,同时还是创建一个索引文件来记录每一个临时文件在合并后的文件中偏移。当reducer取数据时根据reducer partitionid就能以及索引文件就能找到对应的数据块。
  3. SortShuffleWriter, 会在map做key的aggregate操作,(key,value)会先在保存在内存里,并按照用户自定义的aggregator做key的聚合操作,并在达到一定的内存大小后,对内存中已有的记录按(partition,key)做排序,然后保存到磁盘上的临时文件。最终对生成的文件再做一次merge操作。

2.1 BypassMergeSortShuffleWriter

1. 什么情况下使用
不需要在map端做combine操作,且partitioner产生的分区数量(也就是reducer的个数)小于配置文件中spark.shuffle.sort.bypassMergeThreshold定义的大小(默认值是200)

2. 如何写出map output
下图是BypassMergeSortShuffleWriter写出数据的方式:

shufflewrite.png

输入数据是(nation,city)的键值对,调用reduceByKey(_ + "," + _,3)。运行在在partition-0上的ShuffleMapTask使用BypassMergeSortShuffleWriter#write的过程如下:

  1. 根据reducer的个数(partitioner决定)n 创建n个DiskBlockObjectWriter,每一个创建一个临时文件,临时文件命名规则为temp_shuffle_uuid,也就是每一个临时文件放的就是下游一个reduce的输入数据。
  2. 迭代访问输入的数据记录,调用partitioner.getPartition(key)计算出记录的应该落在哪一个reducer拥有的partition,然后索引到对应的DiskBlockObjectWriter对象,写出key, value
  3. 创建一个名为shuffle_shuffleid_mapid_0.uuid这样的临时且绝对不会重复的文件,然后将1中生成的所有临时文件写入到这个文件中,写出的顺序是partitionid从小到大开始的(这里之所以使用uuid创建文件,主要是不使用uuid的话可能有另外一个任务也写出过相同的文件,文件名中的0本来应该是reduceid,但是由于合并到只剩一个文件,就用0就行了)。
  4. 写出索引文件,索引文件名为shuffle_shuffleid_mapid_0.index.uuid(使用uuid和3中的原因是一样的)。由于map的输出数据被合并到一个文件中,reducer在读取数据时需要根据索引文件快速定位到应该读取的数据在文件中的偏移和大小。
  5. 索引文件只顺序写出partition_0 ~ partition_n的偏移的值
  6. 还需要将3中shuffle_shuffleid_mapid_0.uuid重命名为``shuffle_shuffleid_mapid_0`, 过程是验证一下是不是已经存在这么一个文件以及文件的长度是否等于 1 中所有临时文件相加的大小,不是的话就重命名索引文件和数据文件(去掉uuid)。否则的话表示先前已经有一个任务成功写出了数据,直接删掉临时索引和数据文件,返回。

以上就是BypassMergeSortShuffleWriter写数据的方式。有如下特点:

  1. map端没有按照key做排序,也没有按照key做聚合操作, [(China, Beijing),(China,Hefei),(China,Shanghai)]如果在map端聚合的话会变成(China,“Beijing,Hefei,Shanghai”)。
  2. 如果有M格mapper,N格reducer,那么会产生M*N个临时文件,但是最终会合并生成M个数据文件,M个索引文件。

2.2 SortShuffleWriter

下面是SortShuffleWrite#write方法

override def write(records: Iterator[Product2[K, V]]): Unit = {
    sorter = if (dep.mapSideCombine) {
      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
      new ExternalSorter[K, V, C](
        context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
    } else {
      // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
      // care whether the keys get sorted in each partition; that will be done on the reduce side
      // if the operation being run is sortByKey.
      new ExternalSorter[K, V, V](
        context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
    }
    sorter.insertAll(records)

    // Don't bother including the time to open the merged output file in the shuffle write time,
    // because it just opens a single file, so is typically too fast to measure accurately
    // (see SPARK-3570).
    val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
    val tmp = Utils.tempFileWith(output)
    try {
      val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
      val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
      shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
      mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
    } finally {
      if (tmp.exists() && !tmp.delete()) {
        logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
      }
    }
  }
  1. 先创建了一个ExternalSorter,sort.insertAll(records)会将数据写到多个磁盘文件中。
  2. 接下来和BypassMergeSortShuffleWriter类似,创建一个名为shuffle_shuffleid_mapid_0.uuid的这种唯一的临时数据文件,将 1 中的多个磁盘文件合并写出到这个临时数据文件中,并写出索引文件,最终的数据文件中相同分区的数据一定是连续分布的,这样就能根据索引文件中的偏移值快速定位到对应分区的数据。

由于写数据的核心在ExternalSorter#insertAll中,下文会主要介绍ExternalSorter。

1. 什么情况下使用
ShuffleredRDD#mapSideCombine为true,且定义了aggregate的情况下会使用SortShuffleWriter。
2. 原理
根据mapSizeCombine是否为true,SortShuffleWriter在写出map output时也会做不同处理,为true时会按用户自定聚合方法按key聚合,并按照(partitionId,key)排序(没指定key的排序方法时就只根据partitionid排序),然后写出到磁盘文件;为false时不会不会做聚合操作,只会进行排序然后写出到磁盘。下文先介绍没有聚合,然后介绍有聚合。两者之间有很多的共同之处,都会先将数据缓存在内存当中,在达到一定大小之后刷到磁盘,但是最大的区别也在此,他们使用了不同的集合缓存数据。

2.2.1 ExternalSorter

下面是ExternalSorter的一些重要的成员:

1. private val blockManager = SparkEnv.get.blockManager
   写出临时文件到磁盘需要blockManager
2. private var map = new PartitionedAppendOnlyMap[K, C]
   private var buffer = new PartitionedPairBuffer[K, C] 
   下文介绍在map端执行聚合操作和不在map聚合是数据会以不同的方式缓存在内存中,map就是在map端聚合是数据缓存的方式
3. private val keyComparator: Comparator[K] 
    key的比较方式,在map端聚合时,数据排序方式是先按partitionId然后按key排序。不在map聚合时这个字段是空,只按partitionId排序
4. private val spills = new ArrayBuffer[SpilledFile]
    缓存在内存中的数据(map或者buffer)在达到一定大小后就会写出到磁盘中,spills保存了所有写出过的磁盘文件,后续会根据spills做merge成一个文件。

2.2.2 不在map端聚合

下面是ExternalSorter#insertAll的源码:

 def insertAll(records: Iterator[Product2[K, V]]): Unit = {
    // TODO: stop combining if we find that the reduction factor isn't high
    val shouldCombine = aggregator.isDefined

    if (shouldCombine) {
      ...
      ...
      // 此处省略了map做combine的代码
    } else {
      // Stick values into our buffer
      while (records.hasNext) {
        addElementsRead()
        val kv = records.next()
        buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
        maybeSpillCollection(usingMap = false)
      }
    }
  }

while循环获取(key,value)记录,然后调用buffer.insert(...)插入记录,此处buffer是PartitionedPairBuffer的实例(PartitionedPairBuffer介绍见附录4.1)。insert会将(key,value)转换成((partition_id,key), value)的形式插入,例如("China","Beijing") ->((1, "China"), "Beijing").

maybeSpillCollection则会根据具体情况决定是否将buffer中的记录写出到磁盘。经过如下调用链路进入到写磁盘操作:

maybeSpillCollection (调用buffer.estimateSize 估算当前buffer大小)
          --> mybeSpill  (会尝试扩容)
                --> spill   (写到磁盘中)

下面是spill方法

override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
    val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
    val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
    spills += spillFile
  }

collection.destructiveSortedWritablePartitionedIterator(comparator)做了很多事情,参数comparator在这种情况下是null。
下面是它的调用序列:

destructiveSortedWritablePartitionedIterator 
   -> partitionedDestructiveSortedIterator
       -> PartitionedPairBuffer#partitionedDestructiveSortedIterator

进入到PartitionedPairBuffer#partitionedDestructiveSortedIterator代码如下:

 override def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
    : Iterator[((Int, K), V)] = {
    val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
    new Sorter(new KVArraySortDataFormat[(Int, K), AnyRef]).sort(data, 0, curSize, comparator)
    iterator
  }

此处参数keyComparator从前面一直传下来的,此处是空值,因此comparator使用partitionComparator,也就是只按照buffer数据所属的partitionId排序。

Sort#sort方法对buffer排序(排序直接在buffer底层数据上移动,也就是说会破坏buffer原有的数据顺序)之后返回iterator,此时这个iterator迭代出来的数据就是按照partitionId排序的数据,同时也就意味者相同的partitionId的数据一定会连续的分布。

回到上面spill方法,spillMemoryIteratorToDisk接收上面提到的iterator作为参数开始输出磁盘, 这个方法大体如下:

  1. 使用batchSizes保存每批量flush的大小,
  2. elementsPerPartition保存每个partition,键值对个数
  3. 创建临时文件,buffer中记录批量写出,只写出key,value,partitionId不写
  4. 返回SpilledFile,里面有blockId,file,elementsPerPartitionbatchSizes这些信息,后续会将SpilledFile合并成一个文件。

和Bypass方式的区别

两者在写map out数据时都会产生多个临时文件,bypass方式产生的每一个临时文件中的数据指挥是下游一个reducer的输入数据,后续合并成同一个文件时很简单只要逐个将临时文件copy就行,但是sort方式中临时文件中的数据可能输入多个reducer,也就意味着在合并到同一个文件时,需要考虑将多个临时文件相同的分区合并好在输出到最终文件中。关于sort的文件合并会在下一节“map端做聚合”之后。

2.2.3 在map端做聚合

定义聚合方法
reduce转换会是的两个RDD之间存在ShuffleDependency,ShuffleDependency,ShuffleDependency的属性aggregator: Aggregator定义了按key聚合的方式,Aggregator类如下:

case class Aggregator[K, V, C] (
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C) {
...}
  • K,V分别时key、value的类型,C是V聚合后的类型。
  • createCombiner, 第一个value转换成聚合后类型。
  • mergeValue, 并入的value。
  • 合并两个已经聚合的数据。

例如我们将相同key的value(String类型)合并到一个List中,则定义:
createCombiner: (s String) => List(s) 将string转成List
mergeValue: (c:List[String],v: String) => v::c 将string加到列表
mergeCombiners: (c1:List[String],c2: List[String]) => c1:::c2 合并两个列表

write过程
下图是一个map端做聚合的shuffle write过程:

sortshuffle_mapside_combine.png

reduceByKey(_ + "," + _)操作把key相同的所有value用“,”连接起来。

依然是调用ExternalSorter#insertAll完成排序,aggregate以及写出到磁盘的过程。此时使用map作为内存缓存的数据结构。写的流程如下:

  1. 从输入iterator中一次读入(key,value),使用partitioner计算key的partitionid,调用map.insert插入数据,格式为((partitionid,key),value),插入时就会对key相同的做aggregate,形成的内存数据布局如上图map(上图map数据已经排序了,但是插入时不会排序,而是在写出磁盘时排序)。
  2. 当map的数据达到一定大小时,使用blockManager创建临时文件temp_shuffle_uuid,然后对map数据排序,输出到临时文件。排序时现按照partitionid排序,然后按照key排序,保证临时文件中相同partitionid的数据一定是连续分布的。
  3. 完成ExternalSorter#insertAll调用,生成若干临时文件,合并这些文件。

源码解析
源码基本和不做聚合时一样,区别主要是在用作内存缓存的集合buffer和map的区别。附录介绍了buffer和map的原理。

3. ShuffleRead

前面RDD转换图中,RDD#reduceByKey产生了MapPartitionRDD到ShufferedRDD的转换,shuffle read操作发生在转换ShufferedRDD的compute方法中,下面是ShufferedRDD#compute方法:

 override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
    SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
      .read()
      .asInstanceOf[Iterator[(K, C)]]
  }

通过shuffleManager.getReader获得ShuffleReader,返回的是BlockStoreShuffleReader的实例,参数[split.index,split.index+1)表示需要从上游stage0 所有task产生的数据文件中读取split.index这一个分区的记录。

下面是BlockStoreShuffleReader#read方法

/** Read the combined key-values for this reduce task */
  override def read(): Iterator[Product2[K, C]] = {
    val wrappedStreams = new ShuffleBlockFetcherIterator(
      context,
      blockManager.shuffleClient,
      blockManager,
      mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
      serializerManager.wrapStream,
      // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
      SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
      SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
      SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))

    val serializerInstance = dep.serializer.newInstance()

    // Create a key/value iterator for each stream
    val recordIter = wrappedStreams.flatMap { case (blockId, wrappedStream) =>
      // Note: the asKeyValueIterator below wraps a key/value iterator inside of a
      // NextIterator. The NextIterator makes sure that close() is called on the
      // underlying InputStream when all records have been read.
      serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
    }

    // Update the context task metrics for each record read.
    val readMetrics = context.taskMetrics.createTempShuffleReadMetrics()
    val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
      recordIter.map { record =>
        readMetrics.incRecordsRead(1)
        record
      },
      context.taskMetrics().mergeShuffleReadMetrics())

    // An interruptible iterator must be used here in order to support task cancellation
    val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter)

    val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
      if (dep.mapSideCombine) {
        // We are reading values that are already combined
        val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
        dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
      } else {
        // We don't know the value type, but also don't care -- the dependency *should*
        // have made sure its compatible w/ this aggregator, which will convert the value
        // type to the combined type C
        val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
        dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
      }
    } else {
      require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
      interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
    }

    // Sort the output if there is a sort ordering defined.
    dep.keyOrdering match {
      case Some(keyOrd: Ordering[K]) =>
        // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
        // the ExternalSorter won't spill to disk.
        val sorter =
          new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer)
        sorter.insertAll(aggregatedIter)
        context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
        context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
        context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
        CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
      case None =>
        aggregatedIter
    }
  }
}

这是一个很复杂的方法,从上游的map output读区属于当前分区的block,层层封装迭代器,从上面代码可以看到有如下迭代器:

  1. ShuffleBlockFetcherIterator
    其next方法返回类型为(BlockId, InputStream)。当前reduce分区需要从上游map 输出数据中fetch多个block。这个迭代器负责从上游fetch到blockid中的数据(由于write阶段数据是合并到一个blockid文件中,所以数据是其中一段),然后将从数据创建InputStream,并把blockid以及创建的stream返回。显然如果上游有三个partition,每个partition的输出数据文件中有一段是当前的输入,那这个迭代器三次就结束了。

  2. val recordIter = wrappedStreams.flatMap { ...}
    1 中迭代器产生(BlockId,InputStream),但是作为read 而言spark最终需要的读出一个个(key,value),在 1 的iterator上做一次flatMap将(BlockId,InputStream)转换成(key,value)。
    先是调用serializerInstance.deserializeStream(wrappedStream)使用自定义的序列化方式包装一下1中的输入流,这样就能正常读出反序列化后的对象;然后调用asKeyValueIterator转换成NextIterator,其next方法就反序列化后的流中读出(key,value)。

  3. val metricIter = CompletionIterator...
    这个迭代器包装2中迭代器,next方法也只是包装了2中的迭代器,但是多了一个度量的功能,统计读入多少(key,value)。

  4. InterruptibleIterator, 这个迭代器使得任务取消是优雅的停止读入数据。

  5. val aggregatedIter: Iterator[Product2[K, C]] = if ...
    从前面shuffle write的过程可以知道,即便每一个分区任务写出时做了value的聚合,在reducer端的任务里,由于有多个分区的数据,因此依然还要需要对每个分区里的相同的key做value的聚合。
    这个iterator就是完成这个功能。
    首先,会从4 中迭代器中一个个读入数据,缓存在内存中(map缓存,因为要做聚合),并且在必要时spill到磁盘(spill之前会按key排序)。这个过程和shuffle write中在map端聚合时操作差不多。
    然后, 假设上一部产生了多个spill文件,那么每一个spill文件必然时按key排序的,再对这个spill文件做归并,归并时key相同的进行聚合。
    最后, 迭代器的next返回key以及聚合后的value。

  6. dep.keyOrdering match {...
    5中相同key的所有value都按照用户自定义的聚合方法聚合在一起了,但是iterator输出是按key的hash值排序输出的,用户可能自定义了自己的排序方法。这里又使用了ExternalSorter,按照自定义排序方式排序(根据前面External介绍,可能又会有spill磁盘的操作。。。),返回的iterator按照用户自定义排序返回聚合后的key。

至此shuffle read算是完成。

3.1 Shuffle Read源码解析

层层包装的iterator中,比较复杂的在两个地方:

  1. 上面1中 ShuffleBlockFetcherIterator,从上游依赖的rdd读区分区数据。
  2. 上面5中aggregatedIter,对读取到的各个分区数据做reducer端的aggregate

这里只介绍上面2处。

3.1.1 ShuffleBlockFetchIterator

下面是BlockStoreShuffleReader#read创建该iterator时的代码:

val wrappedStreams = new ShuffleBlockFetcherIterator(
      context,
      blockManager.shuffleClient,
      blockManager,
      mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
      serializerManager.wrapStream,
      // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
      SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
      SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
      SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))
  1. blockManager.shuffleClient, 上NettyBlockTranseferService的实例,这在《Spark初始化》文章中介绍过,用来传输datablock。NettyBlockTransferService可以参考《Spark 数据传输》
  2. mapOutputTracker.getXXX返回executorId到BlockId的映射,表示当前partition需要读取的上游的的block的blockid,以及blockid所属的executor。
  3. serializerManager.wrapStream, 反序列化流,上有数据被包装成输入流之后,再使用反序列化流包装之后读出对象。

创建ShuffleBlockFetchIterator时会调用它的initialize方法,该方法如下:

private[this] def initialize(): Unit = {
    // Add a task completion callback (called in both success case and failure case) to cleanup.
    context.addTaskCompletionListener(_ => cleanup())

    // Split local and remote blocks.
    val remoteRequests = splitLocalRemoteBlocks()
    // Add the remote requests into our queue in a random order
    fetchRequests ++= Utils.randomize(remoteRequests)
    assert ((0 == reqsInFlight) == (0 == bytesInFlight),
      "expected reqsInFlight = 0 but found reqsInFlight = " + reqsInFlight +
      ", expected bytesInFlight = 0 but found bytesInFlight = " + bytesInFlight)

    // Send out initial requests for blocks, up to our maxBytesInFlight
    fetchUpToMaxBytes()

    val numFetches = remoteRequests.size - fetchRequests.size
    logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime))

    // Get Local Blocks
    fetchLocalBlocks()
    logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime))
  }
  1. splitLocalRemoteBlocks, 根据executorId区分出在本地的的block和远程的block,然后构建出FetchRequest(每一个request可能包含多个block,但是block都是属于一个executor)。
  2. fetchUpToMaxBytes和fetchLocalBlocks,从本地或者远程datablock,数据放在buffer中,包装好buffer放到其成员results(一个阻塞队列)中。

作为iterator,它的next方法每次从results中取出一个,从数据buffer中创建出InputStream,使用wrapStream包装InputStream返回。

3.1.2 aggregatedIter

用来将上游各个partition中的数据在reducer再聚合的,
调用dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)创建aggregatedIter,下面是combineCombinersByKey方法:

 def combineCombinersByKey(
      iter: Iterator[_ <: Product2[K, C]],
      context: TaskContext): Iterator[(K, C)] = {
    val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
    combiners.insertAll(iter)
    updateMetrics(context, combiners)
    combiners.iterator
  }

调用ExternalAppendOnlyMap#insertAll将输入数据,这个类和PartitionedAppendOnlyMap原理十分类似,实际上它内部使用

  @volatile private var currentMap = new SizeTrackingAppendOnlyMap[K, C]

这个成员来缓存数据,插入数据同时会合并key相同的value,在内存不够时,会保存到磁盘上,返回的iterator则会迭代磁盘中的文件合并的结果,可以参考附录4.2节。

关于ExternalAppendOnlyMap#iterator的介绍见附录4.3 ExternalAppendOnlyMap

4. 附录

4.1 PartitionedPairBuffer

数据存放格式
2.2.2节中说到当不在Map 端做聚合时,ExternalSorter使用buffer作为内存缓存数据时的数据结构,调用buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])插入数据记录。插入数据时将(key,value)转换成((partition-id,key), value)的形式插入。

下面PartitionedPairBuffer的核心属性:

private var capacity = initialCapacity
private var curSize = 0
private var data = new Array[AnyRef](2 * initialCapacity)
  1. data是一个数据,就是PartitionedPairBuffer底层用来存储数据,其初始长度是0。

下面是PartitionedPairBuffer的insert方法

def insert(partition: Int, key: K, value: V): Unit = {
    if (curSize == capacity) {
      growArray()
    }
    data(2 * curSize) = (partition, key.asInstanceOf[AnyRef])
    data(2 * curSize + 1) = value.asInstanceOf[AnyRef]
    curSize += 1
    afterUpdate()
  }

依次插入key,和value。因此PartitionedPairBuffer中数据排列的方式

_______________________________________________________
| key1 | value1 | key2 | value2 | ... | keyN | valueN |
_______________________________________________________

数据是连续分布的。

数据排序
ExternalSorter使用buffer的size达到一定大小后会将buffer中数据spill到磁盘,在此之前需要对乱序的data数据排序。
PartitionedPairBuffer#partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
方法对data数据中的数据进行排序,按照key排序,参数keyComparator定义key的比较方式。

在ExternalSorter中,data数组中key是(partition-id,key)。keyComparator取partition-id比较大小排序。这样就保证相同的partition-id连续分布在写到磁盘中的文件中。

排序所用的算法为timsort(优化后的归并排序),参考timsort wiki

4.2 PartitionedAppendOnlyMap

2.2.3 节中介绍当shuffle write对写出的数据做map端聚合时,用来做内存缓存数据的数据结构式map。
数据存放格式

PartitionedAppendOnlyMap类有如下继承关系:

AppendOnlyMap
           ^
           |
SizeTrackingAppendOnlyMap  WritablePartitionedPairCollection
                     ^                             ^
                     |                             |
                     _______________________________
                                        ^
                                        |
                             PartitionedAppendOnlyMap 

2.2.3节中ExternalSorter向map中插入数据的代码如下:

insertAll(...){
...
if (shouldCombine) {
      // Combine values in-memory first using our AppendOnlyMap
      val mergeValue = aggregator.get.mergeValue
      val createCombiner = aggregator.get.createCombiner
      var kv: Product2[K, V] = null
      val update = (hadValue: Boolean, oldValue: C) => {
        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
      }
      while (records.hasNext) {
        addElementsRead()
        kv = records.next()
        map.changeValue((getPartition(kv._1), kv._1), update)
        maybeSpillCollection(usingMap = true)
      }
    }
...
}
mergeValue,createCombiner即定义在Aggregator中合并value的函数。
调用的map.changeValue插入数据,这个方法还传入的参数update函数,
map调用changeValue插入数据时,会首先调用update,update做如下判断:
1. 若key之前已经在map中(hadValue=true),调用mergeValue合并key相同的value
2. key不存在(hadValue=false),转换value。

所以综上所述,在map端按key聚合就是在插入数据的过程的完成的。

调用PartitionedAppednOnlyMap#insert(),会有下面调用链:

PartitionedAppendOnlyMap#changeValue(key,value)
   -> SizeTrackingAppendOnlyMap#changeValue( (partition-id,key), value) 和buffer插入一样,将key转换(partition-id,key)
       ->AppendOnlyMap#changeValue( (partition-id,key),value )

底层数据结构在AppendOnlyMap中,AppendOnlyMap有如下属性:

  private var data = new Array[AnyRef](2 * capacity)

底层存储数据依然使用data数组。

下面是AppendOnlyMap#changeValue方法:

def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
    assert(!destroyed, destructionMessage)
    val k = key.asInstanceOf[AnyRef]
    if (k.eq(null)) {
      if (!haveNullValue) {
        incrementSize()
      }
      nullValue = updateFunc(haveNullValue, nullValue)
      haveNullValue = true
      return nullValue
    }
    var pos = rehash(k.hashCode) & mask
    var i = 1
    while (true) {
      val curKey = data(2 * pos)
      if (curKey.eq(null)) {
       // curKey是null,表示没有插入过相同的key,不需要合并
       // updateFunc就是上面提到的update,合并value的
        val newValue = updateFunc(false, null.asInstanceOf[V])
        data(2 * pos) = k
        data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
        incrementSize()
        return newValue
      } else if (k.eq(curKey) || k.equals(curKey)) {
       // curKey不是null,表示有插入过相同的key,需要合并
       // updateFunc就是上面提到的update,合并value的
        val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
        data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
        return newValue
      } else {
        val delta = i
        pos = (pos + delta) & mask
        i += 1
      }
    }
    null.asInstanceOf[V] // Never reached but needed to keep compiler happy
  }

上面代码中对于待插入的(key,value),不像buffer中那样直接放在数据尾部,而是调用pos = rehash(...)确定插入的位置,因此其底层的数据可能是下面这样的:

______________________________________________________________________
| key1 | value1 |   |   | key2 | value2 | ... | keyN | valueN |    |
______________________________________________________________________

使用hash的方式确定位置,意味着数据不是连续的,存在空槽。

数据排序
和buffer排序有点区别,buffer由于数据是连续分布,没有空槽,timsort可以直接在数组上排序。但是map由于空槽的存在,需要先将数据聚拢在一起,然后使用和buffer一样的排序。

4.3 ExternalAppendOnlyMap

它有如下核心成员:

 @volatile private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
  private val spilledMaps = new ArrayBuffer[DiskMapIterator]
  1. currentMap是其内部用来缓存数据
  2. spilledMaps,currentMap的size达到一定大小之后,会将数据写到磁盘,这个里面保存了用来迭代返回磁盘文件中(key,value)。

这里主要介绍ExternalAppendOnlyMap#iterator。下面是iterator方法:

 override def iterator: Iterator[(K, C)] = {
    if (currentMap == null) {
      throw new IllegalStateException(
        "ExternalAppendOnlyMap.iterator is destructive and should only be called once.")
    }
    if (spilledMaps.isEmpty) {
      CompletionIterator[(K, C), Iterator[(K, C)]](
        destructiveIterator(currentMap.iterator), freeCurrentMap())
    } else {
      new ExternalIterator()
    }
  }
  1. spilledMap.isEmpty表示内存够用,没有spill到磁盘,这个时候比较好办不需要再将磁盘文件合并的,直接在底层存储结构currentMap上迭代就行了。
  2. 否则,需要合并磁盘文件,创建ExternalIterator用来合并文件。

ExternalIterator
对spill到磁盘文件做外部归并的。
它有如下成员:

 private val mergeHeap = new mutable.PriorityQueue[StreamBuffer]

    // Input streams are derived both from the in-memory map and spilled maps on disk
    // The in-memory map is sorted in place, while the spilled maps are already in sorted order
    private val sortedMap = CompletionIterator[(K, C), Iterator[(K, C)]](destructiveIterator(
      currentMap.destructiveSortedIterator(keyComparator)), freeCurrentMap())
    private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)
  1. inputStreams, sortedMap是当前内存currentMap的迭代器,spilledMaps是磁盘文件的迭代器,将这些迭代器转换成BufferedIterator(可以预读下一个数据,而移动迭代器)。
  2. mergeHeap,小根堆。

ExternalIterator实例化话调用如下方法:

 inputStreams.foreach { it =>
      val kcPairs = new ArrayBuffer[(K, C)]
      readNextHashCode(it, kcPairs)
      if (kcPairs.length > 0) {
        mergeHeap.enqueue(new StreamBuffer(it, kcPairs))
      }
  1. readNextHashCode,连续读区it迭代器中相同的key的所有记录,碰到不同key时停止
  2. mergeHeap.enqueue,将1中的所有(key,value)包装方入小根堆中。StreamBuffer重写了comparaTo方法,按照key的hash值排序。key小的就在堆顶端。
  3. foreach,对每一个待归并的文件,每次取出其靠前的key相同的连续记录,放到小根堆

接下来是其next方法:

override def next(): (K, C) = {
      if (mergeHeap.isEmpty) {
        throw new NoSuchElementException
      }
      // Select a key from the StreamBuffer that holds the lowest key hash
      //从堆中取出key hash最小的(key, value)序列
      val minBuffer = mergeHeap.dequeue()
      val minPairs = minBuffer.pairs
      val minHash = minBuffer.minKeyHash
      // 从(key,value)序列中取下第一个(key,value)记录
      val minPair = removeFromBuffer(minPairs, 0)
      val minKey = minPair._1
      var minCombiner = minPair._2
      assert(hashKey(minPair) == minHash)

      
      val mergedBuffers = ArrayBuffer[StreamBuffer](minBuffer)
      // 判断堆中当前key hash最小的和刚刚取出来的第一个记录hash是
      //不是一样,是一样则有可能是同一个key,但也可能不是同一个
      // key,因为在inputStreams.foreach中是使用hashcode判断key
   // 相等的,和reducer端则是使用==判断。
      while (mergeHeap.nonEmpty && mergeHeap.head.minKeyHash == minHash) {
       
        val newBuffer = mergeHeap.dequeue()
        // 可能需要合并,newBuffer中存放的是key的hashCode相等
       // 的序列,但是key1==minKey不一定成立,所以可能只会合并
       // minBuffer和newBuffer中的一部分数据
        minCombiner = mergeIfKeyExists(minKey, minCombiner, newBuffer)
        mergedBuffers += newBuffer
      }

      
    // 前面说到buffer中数据可能只会有一部分合并,对于没有合并的
   // 还需要重新添加到堆中,等待下一轮合并
      mergedBuffers.foreach { buffer =>
        if (buffer.isEmpty) {
         // minBuffer和newBuffer全部合并了,那可以从迭代器中读区下.    
// 一批key的hashcode一样的连续记录了
          readNextHashCode(buffer.iterator, buffer.pairs)
        }
        if (!buffer.isEmpty) {
          mergeHeap.enqueue(buffer)
        }
      }
      
      (minKey, minCombiner)
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,686评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,668评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,160评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,736评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,847评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,043评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,129评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,872评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,318评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,645评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,777评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,861评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,589评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,687评论 2 351

推荐阅读更多精彩内容