Spark Core源码精读计划#19:RDD的依赖与分区逻辑

目录

前言

按照计划,本文来讲解RDD的依赖与分区器。这两者不仅与之后调度系统的细节(DAG、Shuffle等)息息相关,而且也是面试Spark系大数据研发工程师时经常被问到的基础问题(反正我是会问的),因此看官也可以将本文当做一篇面试知识点解析来看。

RDD依赖

Dependency抽象类及子类

在Spark Core中,RDD依赖关系的基类就是Dependency抽象类。它的定义只有一句话。

代码#19.1 - o.a.s.Dependency抽象类

@DeveloperApi
abstract class Dependency[T] extends Serializable {
  def rdd: RDD[T]
}

该类中只定义了一个方法rdd(),它用于取得当前RDD依赖的父RDD。Dependency及其子类的类图如下所示。

图#19.1 - Dependency继承体系

我们已经知道,RDD依赖分为窄依赖和宽依赖(Shuffle依赖)两种,下面分别来看。

窄依赖

所谓窄依赖,是指父RDD的每个分区都仅被子RDD的一个分区所依赖,也就是说子RDD的一个分区固定对应一个父RDD的单个分区。窄依赖在代码中的基类是NarrowDependency抽象类。

代码#19.2 - o.a.s.NarrowDependency抽象类

@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
  def getParents(partitionId: Int): Seq[Int]

  override def rdd: RDD[T] = _rdd
}

可见,NarrowDependency类带有一个构造方法参数_rdd,并重写rdd()方法让其返回之,它就是当前RDD依赖的父RDD。另外,它还定义了一个抽象方法getParents(),用来返回partitionId对应分区依赖的所有父RDD的分区ID。该方法由NarrowDependency的子类实现,分别为OneToOneDependency(一对一依赖)和RangeDependency(范围依赖),它们的代码都比较简单。

代码#19.3 - o.a.s.OneToOneDependency与RangeDependency类

@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}

@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
  extends NarrowDependency[T](rdd) {

  override def getParents(partitionId: Int): List[Int] = {
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)
    } else {
      Nil
    }
  }
}

可见,它们返回的都是只有一个元素的List,且OneToOneDependency的父子RDD分区ID严格相同,常见的map()或filter()等算子都会产生OneToOneDependency。而在RangeDependency中,子RDD中ID为partitionId的分区与父RDD中ID为(partitionId - outStart + inStart)的分区一一对应,其中inStart为父RDD分区ID的起始值,outStart为子RDD分区ID的起始值,例如union()算子就会产生这种依赖。

上面讲的两种情况都是一一对应关系。当子RDD分区对应多个父RDD的分区(如join()算子)时,也可以形成窄依赖。其前提是父子RDD的分区规则完全相同,即子RDD的某个分区p对应父RDD 1的分区p,也对应父RDD 2的分区p。如果分区规则不同,就会变成宽依赖。

下面的图来自网络,原始出处已不可考,比较形象地说明了窄依赖。

图#19.2 - 窄依赖的三种情况

宽依赖

严格来讲,它的名字应该叫“Shuffle依赖”,因为在Spark代码中,它的类名是ShuffleDependency。不过在中文圈子里,“宽依赖”这个名字也同样通用。它就是指子RDD的一个分区会对应一个父RDD的多个分区,并且往往是全部分区。ShuffleDependency类的代码如下。

代码#19.4 - o.a.s.ShuffleDependency类

@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false)
  extends Dependency[Product2[K, V]] {
  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

  private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
  private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
  private[spark] val combinerClassName: Option[String] =
    Option(reflect.classTag[C]).map(_.runtimeClass.getName)

  val shuffleId: Int = _rdd.context.newShuffleId()

  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.length, this)

  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

由于Shuffle是调度系统的重要组成部分,因此我们这里只限于读ShuffleDependency的源码,做一些基本介绍,大头留到后面去说。

ShuffleDependency类有3个泛型参数,K代表键类型,V代表值类型,而C则代表Combiner的类型。如果看官有Hadoop MapReduce的基础,对Combiner这个词自然不会陌生。由于Shuffle过程对键值型数据才有意义,因此ShuffleDependency对父RDD的泛型类型有限制,必须是Product2[K,V]或者其子类,Product2在Scala中代表两个元素的笛卡尔积。

其他构造方法参数说明如下:

  • partitioner:分区器,下面马上就会讲到。
  • serializer:闭包序列化器,SparkEnv中已经创建,为JavaSerializer。
  • keyOrdering:可选的对键类型K排序的排序规则。
  • aggregator:可选的Map端数据聚合逻辑。
  • mapSideCombine:指定是否启用Map数据预聚合。

随着宽依赖的创建,还会调用SparkContext.newShuffleId()方法分配一个新的Shuffle ID,以及调用ShuffleManager.registerShuffle方法注册该Shuffle,返回Shuffle句柄(ShuffleHandle)。虽然在很久之前的几篇文章中讲过Shuffle相关的话题,但是在真正讲到调度系统时,还是会继续深挖的。

下面的图形象地说明了宽依赖。

图#19.3 - 宽依赖的两种情况

RDD分区器

Partitioner抽象类与伴生对象

在上一篇文章讲RDD时,Partitioner就已经出现了,并且它在上一节的ShuffleDependency代码中也是作为构造参数出现。在Shuffle过程中,必须得有确定的计算逻辑来决定父RDD的分区数据如何分配并对应到子RDD的分区中,这就是分区器Partitioner的职责。

Partitioner抽象类的定义也很简单。

代码#19.5 - o.a.s.Partitioner抽象类

abstract class Partitioner extends Serializable {
  def numPartitions: Int
  def getPartition(key: Any): Int
}

numPartitions()方法返回分区总数,而getPartitions()方法根据键返回其将被映射到的分区ID。

Partitioner还带有一个伴生对象,其中定义了defaultPartitioner()方法,顾名思义,它(根据上游的一个或一些RDD)返回默认的分区逻辑,其代码如下。

代码#19.6 - o.a.s.Partitioner.defaultPartitioner()/isEligiblePartitioner()方法

  def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
    val rdds = (Seq(rdd) ++ others)
    val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
    val hasMaxPartitioner: Option[RDD[_]] = if (hasPartitioner.nonEmpty) {
      Some(hasPartitioner.maxBy(_.partitions.length))
    } else {
      None
    }

    val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism")) {
      rdd.context.defaultParallelism
    } else {
      rdds.map(_.partitions.length).max
    }

    if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
        defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) {
      hasMaxPartitioner.get.partitioner.get
    } else {
      new HashPartitioner(defaultNumPartitions)
    }
  }

  private def isEligiblePartitioner(
     hasMaxPartitioner: RDD[_],
     rdds: Seq[RDD[_]]): Boolean = {
    val maxPartitions = rdds.map(_.partitions.length).max
    log10(maxPartitions) - log10(hasMaxPartitioner.getNumPartitions) < 1
  }

该方法会从输入的所有RDD中取出那些定义了分区逻辑的RDD,然后找到其中分区数最大的那个Partitioner。如果SparkConf中定义了缺省并行度配置项,即spark.default.parallelism,那么默认分区器的分区数就会采用该参数的值,否则就直接用所有RDD中最大的分区数(这就是为什么几乎总是推荐在提交Spark作业时设定spark.default.parallelism)。

然后,调用isElegiblePartitioner()方法,判断分区数最大的那个Partitioner是否“合格”,判断逻辑是其分区数与所有上游RDD中最大分区数之差小于一个数量级。如果通过检查,并且默认分区数比它小,就采用分区数最大的那个Partitioner作为分区逻辑,否则用默认分区数构造一个新的HashPartitioner并返回。

Partitioner在Spark Core中的实现类主要有两个:基于散列的HashPartitioner和基于采样范围的RangePartitioner,前者是默认实现。下面我们就以HashPartitioner为例来看看其具体实现(RangePartitioner太麻烦了)。

HashPartitioner

代码#19.7 - o.a.s.HashPartitioner类

class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

可见,在重写的getPartition()方法中,会取得键的hashCode值,对分区数numPartitions取模,返回其绝对值,这样就确保映射到的分区落在[0,numPartitions - 1]的区间内。为了判断两个HashPartitioner是否相等,也必须同时重写其equals()和hashCode()方法,判断标准自然就只有分区数了。

我们也可以很容易地想到,用户通过自己继承Partitioner类,能够自定义分区逻辑。下面就是一个简单的示例,它通过Key的长度来分区。由于它不属于Spark源码,就不编号了。

class KeyLengthPartitioner(partitions: Int) extends Partitioner {
  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = {
    return key.asInstanceOf[String].length() & (partitions - 1)
  }
}

总结

本文分别以Spark Core中的Dependency与Partitioner两个抽象类为起点,比较详细地讲解了Spark中RDD依赖关系与分区逻辑的具体设计。依赖与分区是RDD五要素中最重要的两个点,在今后的源码阅读过程中,会经常用到它们。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 230,825评论 6 546
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 99,814评论 3 429
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 178,980评论 0 384
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 64,064评论 1 319
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 72,779评论 6 414
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 56,109评论 1 330
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 44,099评论 3 450
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 43,287评论 0 291
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 49,799评论 1 338
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 41,515评论 3 361
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 43,750评论 1 375
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 39,221评论 5 365
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 44,933评论 3 351
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 35,327评论 0 28
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 36,667评论 1 296
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 52,492评论 3 400
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 48,703评论 2 380