spark源码阅读——shuffle读

DAGScheduler在拆分任务的时候如果发现需要shuffle则会把之前RDD运算产生的结果输出到本地磁盘中(详细的会在以后的文章分析)。

紧接着就需要对Shuffle后的结果分别进行运算了(比如说count
那么接着之前的RDD会有一个ShuffledRDD来处理shuffle之后的结果。
(实际上是一个新的Stage

同样在这个Stage会把任务拆分成Task并发送给Executor

这里拆分成的TaskResultTask实际上也很简单,任务反序列化之后执行ShuffledRDD.iterator -> ShuffledRDD.compute

override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
    SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
      .read()
      .asInstanceOf[Iterator[(K, C)]]
  }

ShuffleManager获取一个Reader读取之前Shuffle输出的数据进行运算。
实际这个Reader是一个BlockStoreShuffleReader
这个类会做什么呢?

  • 首先任务是计算我这个partition的结果,我需要知道之前依赖的partition的数据的位置(MapOutputTracker)
  • 根据位置获取依赖的数据。(BlockManager)
  • 如果需要combine则执行聚合逻辑
  • 如果需要排序则排序(ExternalSorter)
/**
 * Fetches and reads the partitions in range [startPartition, endPartition) from a shuffle by
 * requesting them from other nodes' block stores.
 */
private[spark] class BlockStoreShuffleReader[K, C](
    handle: BaseShuffleHandle[K, _, C],
    startPartition: Int,
    endPartition: Int,
    context: TaskContext,
    serializerManager: SerializerManager = SparkEnv.get.serializerManager,
    blockManager: BlockManager = SparkEnv.get.blockManager,
    mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker)
  extends ShuffleReader[K, C] with Logging {

override def read(): Iterator[Product2[K, C]] = {
    val wrappedStreams = new ShuffleBlockFetcherIterator(
      context,
      blockManager.shuffleClient,
      blockManager,
      mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
      serializerManager.wrapStream,
      ...
   )

    val serializerInstance = dep.serializer.newInstance()

    // Create a key/value iterator for each stream
    val recordIter = wrappedStreams.flatMap { case (blockId, wrappedStream) =>
       ...
      serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
    }

       ...
    val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter)

    val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
      if (dep.mapSideCombine) {
         ...
        val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
        dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
      } else {
        val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
        dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
      }
    } else {
       ...      
       interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
    }

    // Sort the output if there is a sort ordering defined.
    dep.keyOrdering match {
      case Some(keyOrd: Ordering[K]) =>
        // Create an ExternalSorter to sort the data.
        val sorter =
          new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer)
        sorter.insertAll(aggregatedIter)
        ...
        CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
      case None =>
        aggregatedIter
    }
  }

mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition)
这个方法返回的参数类型是Seq[(BlockManagerId, Seq[(BlockId, Long)])]
也就是说在哪个Executor的哪个位置上保存着需要的数据信息,数据大小是多少。

(每个Executor都有一个SparkEnv,每个SparkEnv都包含一个BlockManagerDriver端的BlockManager是Master,Executor端的BlockManager是slave,集群内部的BlockManager构成了一个master-slave模式的集群,以后会说)

ShuffleBlockFetcherIterator

这个对象实际上就负责的从远程节点拉取所有数据的任务。
首先到了initialize方法

  • 注册回调,方便任务结束后清理内存(ByteBuffer)
  • 分离请求,有的数据实际可能保存在本地
  • 获取远程数据块
  • 获取本地数据块
private[this] def initialize(): Unit = {
    
    context.addTaskCompletionListener(_ => cleanup())
    ...
    val remoteRequests = splitLocalRemoteBlocks()
    ...
    fetchRequests ++= Utils.randomize(remoteRequests)
    ...
    fetchUpToMaxBytes()
    ...
    fetchLocalBlocks()
    ...
  }

这里将数据块信息拆分成获取任务的列表,这里有一个优化,为了加快获取速度,会将同一个文件拆分成多个请求同时获取。

private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
    // Make remote requests at most maxBytesInFlight / 5 in length; the reason to keep them
    // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5
    // nodes, rather than blocking on reading output from one node.

    val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)
    logDebug("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " + targetRequestSize
      + ", maxBlocksInFlightPerAddress: " + maxBlocksInFlightPerAddress)

    // Split local and remote blocks. Remote blocks are further split into FetchRequests of size
    // at most maxBytesInFlight in order to limit the amount of data in flight.
    val remoteRequests = new ArrayBuffer[FetchRequest]

    // Tracks total number of blocks (including zero sized blocks)
    var totalBlocks = 0
    for ((address, blockInfos) <- blocksByAddress) {
      totalBlocks += blockInfos.size
      if (address.executorId == blockManager.blockManagerId.executorId) {
        // Filter out zero-sized blocks
        localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1)
        numBlocksToFetch += localBlocks.size
      } else {
        val iterator = blockInfos.iterator
        var curRequestSize = 0L
        var curBlocks = new ArrayBuffer[(BlockId, Long)]
        while (iterator.hasNext) {
          val (blockId, size) = iterator.next()
          if (size > 0) {
            curBlocks += ((blockId, size))
            remoteBlocks += blockId
            numBlocksToFetch += 1
            curRequestSize += size
          } else if (size < 0) {
            throw new BlockException(blockId, "Negative block size " + size)
          }
          if (curRequestSize >= targetRequestSize ||
              curBlocks.size >= maxBlocksInFlightPerAddress) {
            remoteRequests += new FetchRequest(address, curBlocks)
            curBlocks = new ArrayBuffer[(BlockId, Long)]
            curRequestSize = 0
          }
        }
        // Add in the final request
        if (curBlocks.nonEmpty) {
          remoteRequests += new FetchRequest(address, curBlocks)
        }
      }
    }
    remoteRequests
  }

拆分完获取任务就要直接开始获取任务了。这里是有一个请求速率控制的机制在里面,分别是maxBytesInFlight和maxRequestInFlight,如果能发送请求则发送,否则放到延迟队列中,等待下一次调用这个方法的时候去发送请求。


private def fetchUpToMaxBytes(): Unit = {
    // Send fetch requests up to maxBytesInFlight. If you cannot fetch from a remote host
    // immediately, defer the request until the next time it can be processed.

    // Process any outstanding deferred fetch requests if possible.
    if (deferredFetchRequests.nonEmpty) {
      for ((remoteAddress, defReqQueue) <- deferredFetchRequests) {
        while (isRemoteBlockFetchable(defReqQueue) &&
            !isRemoteAddressMaxedOut(remoteAddress, defReqQueue.front)) {
          val request = defReqQueue.dequeue()
          logDebug(s"Processing deferred fetch request for $remoteAddress with "
            + s"${request.blocks.length} blocks")
          send(remoteAddress, request)
          if (defReqQueue.isEmpty) {
            deferredFetchRequests -= remoteAddress
          }
        }
      }
    }

    // Process any regular fetch requests if possible.
    while (isRemoteBlockFetchable(fetchRequests)) {
      val request = fetchRequests.dequeue()
      val remoteAddress = request.address
      if (isRemoteAddressMaxedOut(remoteAddress, request)) {
        logDebug(s"Deferring fetch request for $remoteAddress with ${request.blocks.size} blocks")
        val defReqQueue = deferredFetchRequests.getOrElse(remoteAddress, new Queue[FetchRequest]())
        defReqQueue.enqueue(request)
        deferredFetchRequests(remoteAddress) = defReqQueue
      } else {
        send(remoteAddress, request)
      }
    }

    def send(remoteAddress: BlockManagerId, request: FetchRequest): Unit = {
      sendRequest(request)
      numBlocksInFlightPerAddress(remoteAddress) =
        numBlocksInFlightPerAddress.getOrElse(remoteAddress, 0) + request.blocks.size
    }

    def isRemoteBlockFetchable(fetchReqQueue: Queue[FetchRequest]): Boolean = {
      fetchReqQueue.nonEmpty &&
        (bytesInFlight == 0 ||
          (reqsInFlight + 1 <= maxReqsInFlight &&
            bytesInFlight + fetchReqQueue.front.size <= maxBytesInFlight))
    }

    // Checks if sending a new fetch request will exceed the max no. of blocks being fetched from a
    // given remote address.
    def isRemoteAddressMaxedOut(remoteAddress: BlockManagerId, request: FetchRequest): Boolean = {
      numBlocksInFlightPerAddress.getOrElse(remoteAddress, 0) + request.blocks.size >
        maxBlocksInFlightPerAddress
    }
  }

发送请求实际借助了ShuffleClient这个类,这个类会使用NettyBlockTransferService这个类向远程的BlockManager发起获取数据块请求。
整个过程是异步的,并在发请求的时候增加了回调。回调便是在获取结束之后把结果放到一个LinkedBlockingQueue里面。

之后便是获取本地的数据块,从本地的BlockManager直接获取即可。

但实际上这个类是一个Iterator,会不断被外部调用next()方法。
next方法实际是阻塞的,因为如果results这个队列是空的则一直阻塞在这里。
每次从队列中获取一个获取结果之后做相应的处理,包装成一个InputStream
每次调用next的时候都会获取一个BlockId和相应的文件流而无需考虑这个文件块是否是远程和本地,因为每次调用next的时候都会调用这个fetchUpToMaxBytes方法,保证远程数据可以一直被获取。

override def next(): (BlockId, InputStream) = {
    if (!hasNext) {
      throw new NoSuchElementException
    }
    numBlocksProcessed += 1

    var result: FetchResult = null
    var input: InputStream = null
    while (result == null) {
           ...
      result = results.take()
           ...
      result match {
        case r @ SuccessFetchResult(blockId, address, size, buf, isNetworkReqDone) =>
          if (address != blockManager.blockManagerId) {
            numBlocksInFlightPerAddress(address) = numBlocksInFlightPerAddress(address) - 1
           ...
          }
          bytesInFlight -= size
          if (isNetworkReqDone) {
            reqsInFlight -= 1
          }

          val in = try {
            buf.createInputStream()
          } catch {
            case e: IOException =>
              buf.release()
              throwFetchFailedException(blockId, address, e)
          }

          input = streamWrapper(blockId, in)
           ...
          }
      }
      fetchUpToMaxBytes()   // <-----------------
    }

    currentResult = result.asInstanceOf[SuccessFetchResult]
    (currentResult.blockId, new BufferReleasingInputStream(input, this))
  }

流程梳理

ResultTaskExecutor上运行,调用ShuffledRDDiterator方法。这个方法从ShuffleManager获取一个BlockStoreShuffleReader,这个Reader内部负责了获取远程Shuffle输出文件的任务,获取之后根据combine,排序等处理数据,完成后续的运算。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,826评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,968评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,234评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,562评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,611评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,482评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,271评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,166评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,608评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,814评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,926评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,644评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,249评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,866评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,991评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,063评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,871评论 2 354

推荐阅读更多精彩内容

  • 1、 性能调优 1.1、 分配更多资源 1.1.1、分配哪些资源? Executor的数量 每个Executor所...
    Frank_8942阅读 4,542评论 2 36
  • 1.1、 分配更多资源 1.1.1、分配哪些资源? Executor的数量 每个Executor所能分配的CPU数...
    miss幸运阅读 3,181评论 3 15
  • spark-submit的时候如何引入外部jar包 在通过spark-submit提交任务时,可以通过添加配置参数...
    博弈史密斯阅读 2,742评论 1 14
  • 原文:https://tech.meituan.com/spark-tuning-pro.html Spark性能...
    code_solve阅读 1,191评论 0 34
  • Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AM...
    大佛爱读书阅读 2,828评论 0 20