Spark Sort Shuffle Read

Shuffle read 是等到Mapper stage结束后才开始读取数据。边读取数据边处理,数据先放在内存,最后落盘。下面先介绍Shuffle read 的详细过程,然后分析这一个过程的内存使用。

Shuffle read过程

  1. ShuffledRDD的compute()方法
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {  
  val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]  
  SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)  
    .read()  
    .asInstanceOf[Iterator[(K, C)]]  
} 

getReader()方法得到HashShuffleReader对象,然后调用HashShuffleReader的read()方法读取前一Stage中ShuffleMapTask生成的数据。

  1. HashShuffleReader的read()方法
override def read(): Iterator[Product2[K, C]] = {  
    val ser = Serializer.getSerializer(dep.serializer)  
    val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)  
  
    val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {  
      if (dep.mapSideCombine) {  
        new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))  
      } else {  
        new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))  
      }  
    } else {  
      require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")  
  
      // Convert the Product2s to pairs since this is what downstream RDDs currently expect  
      iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))  
    }  
  
    // Sort the output if there is a sort ordering defined.  
    dep.keyOrdering match {  
      case Some(keyOrd: Ordering[K]) =>  
        // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,  
        // the ExternalSorter won't spill to disk.  
        val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))  
        sorter.insertAll(aggregatedIter)  
        context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled)  
        context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled)  
        sorter.iterator  
      case None =>  
        aggregatedIter  
    }  
  }  

读取数据的主要流程:1. 获取待拉取数据的iterator;2. 使用AppendOnlyMap/ExternalAppendOnlyMap 做combine,这个过程和shuffle write一样;3. 如果需要对key排序,则使用ExternalSorter。groupByKey, reduceByKey, aggregateByKey不需要对key排序,sortByKey需要对key排序。下面讲述主要如何得到iterator。

  1. BlockStoreShuffleFetcher的fetch()方法
def fetch[T](  
      shuffleId: Int,  
      reduceId: Int,  
      context: TaskContext,  
      serializer: Serializer)  
    : Iterator[T] =  
  {  
    logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))  
    val blockManager = SparkEnv.get.blockManager  
  
    val startTime = System.currentTimeMillis  
    val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)  
    logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format(  
      shuffleId, reduceId, System.currentTimeMillis - startTime))  
  
    val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]]  
    for (((address, size), index) <- statuses.zipWithIndex) {  
      splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size))  
    }  
  
    val blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = splitsByAddress.toSeq.map {  
      case (address, splits) =>  
        (address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))  
    }  
  
    val blockFetcherItr = new ShuffleBlockFetcherIterator(  
      context,  
      SparkEnv.get.blockManager.shuffleClient,  
      blockManager,  
      blocksByAddress,  
      serializer,  
      // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility  
      SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024)  
    val itr = blockFetcherItr.flatMap(unpackBlock)  
  
    val completionIter = CompletionIterator[T, Iterator[T]](itr, {  
      context.taskMetrics.updateShuffleReadMetrics()  
    })  
  }  

fetch()方法会得到ShuffleBlockFetcherIterator对象,该对象的initialize()方法将分别从本地和远程读取数据。

  1. ShuffleBlockFetcherIterator的initialize()方法
private[this] def initialize(): Unit = {  
    // Add a task completion callback (called in both success case and failure case) to cleanup.  
    context.addTaskCompletionListener(_ => cleanup())  
  
    // Split local and remote blocks.  
    val remoteRequests = splitLocalRemoteBlocks()  
    // Add the remote requests into our queue in a random order  
    fetchRequests ++= Utils.randomize(remoteRequests)  
  
    // Send out initial requests for blocks, up to our maxBytesInFlight  
    while (fetchRequests.nonEmpty &&  
      (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {  
      sendRequest(fetchRequests.dequeue())  
    }  
  
    val numFetches = remoteRequests.size - fetchRequests.size  
    logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime))  
  
    // Get Local Blocks  
    fetchLocalBlocks()  
    logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime))  
  }  

首先区分本地和远程数据block(block代表一个partition),sendRequest()会读取远程数据,fetchLocalBlocks()读取本地数据。出于带宽和内存的考虑,每次sendRequest会从最多5个node去获取数据,获取的数据量由spark.reducer.maxMbInFlight控制。下面着重讲sendRequest()的流程。

  1. 读取远程数据
private[this] def sendRequest(req: FetchRequest) {  
    logDebug("Sending request for %d blocks (%s) from %s".format(  
      req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort))  
    bytesInFlight += req.size  
  
    // so we can look up the size of each blockID  
    val sizeMap = req.blocks.map { case (blockId, size) => (blockId.toString, size) }.toMap  
    val blockIds = req.blocks.map(_._1.toString)  
  
    val address = req.address  
    shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray,  
      new BlockFetchingListener {  
        override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit = {  
          // Only add the buffer to results queue if the iterator is not zombie,  
          // i.e. cleanup() has not been called yet.  
          if (!isZombie) {  
            // Increment the ref count because we need to pass this to a different thread.  
            // This needs to be released after use.  
            buf.retain()  
            results.put(new SuccessFetchResult(BlockId(blockId), sizeMap(blockId), buf))  
            shuffleMetrics.incRemoteBytesRead(buf.size)  
            shuffleMetrics.incRemoteBlocksFetched(1)  
          }  
          logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))  
        }  
  
        override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = {  
          logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e)  
          results.put(new FailureFetchResult(BlockId(blockId), e))  
        }  
      }  
    )  
  }  

shuffleClient的fetchBlocks()方法读取远程数据。ShuffleClient有两个子类,分别是ExternalShuffleClient和BlockTransferService,BlockTransferService也有两个子类,分别是NettyBlockTransferService和NioBlockTransferService。Spark 1.5.2中已经将NioBlockTransferService方式设置为deprecated,在后续版本中将被移除。

  1. ExternalShuffleClient的fetchBlocks()方法
public void fetchBlocks(  
    final String host,  
    final int port,  
    final String execId,  
    String[] blockIds,  
    BlockFetchingListener listener) {  
  assert appId != null : "Called before init()";  
  logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);  
  try {  
    RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =  
      new RetryingBlockFetcher.BlockFetchStarter() {  
        @Override  
        public void createAndStart(String[] blockIds, BlockFetchingListener listener)  
            throws IOException {  
          TransportClient client = clientFactory.createClient(host, port);  
          new OneForOneBlockFetcher(client, appId, execId, blockIds, listener).start();  
        }  
      };  
  
    int maxRetries = conf.maxIORetries();  
    if (maxRetries > 0) {  
      // Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's  
      // a bug in this code. We should remove the if statement once we're sure of the stability.  
      new RetryingBlockFetcher(conf, blockFetchStarter, blockIds, listener).start();  
    } else {  
      blockFetchStarter.createAndStart(blockIds, listener);  
    }  
  } catch (Exception e) {  
    logger.error("Exception while beginning fetchBlocks", e);  
    for (String blockId : blockIds) {  
      listener.onBlockFetchFailure(blockId, e);  
    }  
  }  
}  

调用OneForOneBlockFetcher的start()方法,一个chunk一个chunk的读取数据。

  1. OneForOneBlockFetcher的start()方法
public void start() {  
    if (blockIds.length == 0) {  
      throw new IllegalArgumentException("Zero-sized blockIds array");  
    }  
  
    client.sendRpc(openMessage.toByteArray(), new RpcResponseCallback() {  
      @Override  
      public void onSuccess(byte[] response) {  
        try {  
          streamHandle = (StreamHandle) BlockTransferMessage.Decoder.fromByteArray(response);  
          logger.trace("Successfully opened blocks {}, preparing to fetch chunks.", streamHandle);  
  
          // Immediately request all chunks -- we expect that the total size of the request is  
          // reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]].  
          for (int i = 0; i < streamHandle.numChunks; i++) {  
            client.fetchChunk(streamHandle.streamId, i, chunkCallback);  
          }  
        } catch (Exception e) {  
          logger.error("Failed while starting block fetches after success", e);  
          failRemainingBlocks(blockIds, e);  
        }  
      }  
  
      @Override  
      public void onFailure(Throwable e) {  
        logger.error("Failed while starting block fetches", e);  
        failRemainingBlocks(blockIds, e);  
      }  
    });  
  }  

Spark sort shuffle read的内存分析:

ExternalAppendOnlyMap 中作为内存缓冲数据的对象如下:

private var currentMap = new SizeTrackingAppendOnlyMap[K, C]

ExternalSorter中作为内存缓冲数据的对象如下:

private var map = new PartitionedAppendOnlyMap[K, C]

所以在一个worker node 上,一个reducer task占据的主要内存是SizeTrackingAppendOnlyMap + PartitionedAppendOnlyMap。如果一个worker node上有多个reducer task运行(最多为core num个),shuffle read最多可使用的内存大小是

ExecutorHeapMemeory * spark.shuffle.safetyFraction*spark.shuffle.memoryFraction
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容