Spark shuffle机制概述

shuffle及Spark shuffle历史简介

shuffle,中文意译“洗牌”,是所有采用map-reduce思想的大数据计算框架的必经阶段,也是最重要的阶段。它处在map与reduce之间,又可以分为两个子阶段:

  • shuffle write:map任务写上游计算产生的中间数据;
  • shuffle read:reduce任务读map任务产生的中间数据,用于下游计算。

下图示出在Hadoop MapReduce框架中,shuffle发生的时机和细节。

Hadoop MR过程

Spark的shuffle机制虽然也采用MR思想,但Spark是基于RDD进行计算的,实现方式与Hadoop有差异,并且中途经历了比较大的变动,简述如下:

  • 在久远的Spark 0.8版本及之前,只有最简单的hash shuffle,后来引入了consolidation机制;
  • 1.1版本新加入sort shuffle机制,但默认仍然使用hash shuffle;
  • 1.2版本开始默认使用sort shuffle;
  • 1.4版本引入了tungsten-sort shuffle,是基于普通sort shuffle创新的序列化shuffle方式;
  • 1.6版本将tungsten-sort shuffle与sort shuffle合并,由Spark自动决定采用哪一种方式;
  • 2.0版本之后,hash shuffle机制被删除,只保留sort shuffle机制至今。

下面的代码分析致力于对Spark shuffle先有一个大致的了解。

shuffle机制的最顶层:ShuffleManager特征

鉴于shuffle的重要性,shuffle机制的初始化在Spark执行环境初始化时就会进行。查看SparkEnv.create()方法:

  val shortShuffleMgrNames = Map(
    "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
    "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
  val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
  val shuffleMgrClass =
    shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
  // 通过反射创建ShuffleManager
  val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

上面的代码可以印证hash shuffle已经成为历史了。另外,还可以通过spark.shuffle.manager参数手动指定shuffle机制,不过意义不大。

o.a.s.shuffle.ShuffleManager是一个Scala特征(相当于Java接口的增强版)。其中定义的核心方法有3个:

  /**
   * Register a shuffle with the manager and obtain a handle for it to pass to tasks.
   */
  def registerShuffle[K, V, C](
      shuffleId: Int,
      numMaps: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle

  /** Get a writer for a given partition. Called on executors by map tasks. */
  def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V]

  /**
   * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
   * Called on executors by reduce tasks.
   */
  def getReader[K, C](
      handle: ShuffleHandle,
      startPartition: Int,
      endPartition: Int,
      context: TaskContext): ShuffleReader[K, C]
  • registerShuffle()方法用于注册一种shuffle机制,并返回对应的ShuffleHandle(类似于句柄),handle内会存储shuffle依赖信息。根据该handle可以进一步确定采用的ShuffleWriter/ShuffleReader的种类。
  • getWriter()方法用于获取ShuffleWriter。它是executor执行map任务时调用的。
  • getReader()方法用于获取ShuffleReader。它是executor执行reduce任务时调用的。

sort shuffle机制概况:SortShuffleManager类

在hash shuffle取消后,o.a.s.shuffle.sort.SortShuffleManager就是ShuffleManager目前唯一的实现类。来看它对上面提到的三个方法的具体实现。

registerShuffle()方法

  override def registerShuffle[K, V, C](
      shuffleId: Int,
      numMaps: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
    if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
      // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
      // need map-side aggregation, then write numPartitions files directly and just concatenate
      // them at the end. This avoids doing serialization and deserialization twice to merge
      // together the spilled files, which would happen with the normal code path. The downside is
      // having multiple files open at a time and thus more memory allocated to buffers.
      new BypassMergeSortShuffleHandle[K, V](
        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
      // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
      new SerializedShuffleHandle[K, V](
        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else {
      // Otherwise, buffer map outputs in a deserialized form:
      new BaseShuffleHandle(shuffleId, numMaps, dependency)
    }
  }

可以看出,根据条件的不同,会返回3种不同的handle,对应3种shuffle机制。从上到下来分析一下:

  1. 检查是否符合SortShuffleWriter.shouldBypassMergeSort()方法的条件:
  def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
    // We cannot bypass sorting if we need to do map-side aggregation.
    if (dep.mapSideCombine) {
      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
      false
    } else {
      val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
      dep.partitioner.numPartitions <= bypassMergeThreshold
    }
  }

也就是说,如果同时满足以下两个条件:

  • 该shuffle依赖中没有map端聚合操作(如groupByKey()算子)
  • 分区数不大于参数spark.shuffle.sort.bypassMergeThreshold规定的值(默认200)

那么会返回BypassMergeSortShuffleHandle,启用bypass merge-sort shuffle机制。

  1. 如果不启用上述bypass机制,那么继续检查是否符合canUseSerializedShuffle()方法的条件:
  def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
    val shufId = dependency.shuffleId
    val numPartitions = dependency.partitioner.numPartitions
    if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
      log.debug(/*...*/)
      false
    } else if (dependency.aggregator.isDefined) {
      log.debug(/*...*/)
      false
    } else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
      log.debug(/*...*/)
      false
    } else {
      log.debug(/*...*/)
      true
    }
  }
}

也就是说,如果同时满足以下三个条件:

  • 使用的序列化器支持序列化对象的重定位(如KryoSerializer)
  • shuffle依赖中完全没有聚合操作
  • 分区数不大于常量MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE的值(最大分区ID号+1,即2^24=16777216)

那么会返回SerializedShuffleHandle,启用序列化sort shuffle机制(也就是tungsten-sort)。

  1. 如果既不用bypass也不用tungsten-sort,那么就返回默认的BaseShuffleHandle,采用基本的sort shuffle机制。

getWriter()方法

  override def getWriter[K, V](
      handle: ShuffleHandle,
      mapId: Int,
      context: TaskContext): ShuffleWriter[K, V] = {
    numMapsForShuffle.putIfAbsent(
      handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
    val env = SparkEnv.get
    handle match {
      case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
        new UnsafeShuffleWriter(
          env.blockManager,
          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
          context.taskMemoryManager(),
          unsafeShuffleHandle,
          mapId,
          context,
          env.conf)
      case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
        new BypassMergeSortShuffleWriter(
          env.blockManager,
          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
          bypassMergeSortHandle,
          mapId,
          context,
          env.conf)
      case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
        new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
    }
  }

根据不同的handle,获取不同的ShuffleWriter。对于tungsten-sort会使用UnsafeShuffleWriter,bypass会使用BypassMergeSortShuffleWriter,普通的sort则使用SortShuffleWriter。它们都继承自ShuffleWriter抽象类,并且都实现了write()方法。

getReader()方法

  override def getReader[K, C](
      handle: ShuffleHandle,
      startPartition: Int,
      endPartition: Int,
      context: TaskContext): ShuffleReader[K, C] = {
    new BlockStoreShuffleReader(
      handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
  }

ShuffleReader比较简单,只有一种,即BlockStoreShuffleReader。它继承自ShuffleReader特征,并实现了read()方法。

总结

以ShuffleManager展开的简单UML类图
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,904评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,581评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,527评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,463评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,546评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,572评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,582评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,330评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,776评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,087评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,257评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,923评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,571评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,192评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,436评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,145评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352