Spark RDD 之 Partition

Spark RDD

怎么理解 RDD 的粗粒度模式 ?对比 细粒度模式

Spark RDD 的 task 数量是由什么决定的?
一份待处理的原始数据会被按照相应的逻辑(例如jdbc和hdfs的split逻辑)切分成n份,每份数据对应到RDD中的一个Partition,Partition的数量决定了task的数量,影响着程序的并行度

支持保存点(checkpoint)
虽然RDD可以通过lineage实现fault recovery,但是这个恢复可能是很耗时的,因此提供保存点很有必要,通常保存点在有宽依赖时(shuffle耗时)很有用,相反,窄依赖时则不值得使用

Spark RDD 之 Partition

参考:https://blog.csdn.net/u011564172/article/details/53611109

分析Partition主要是分析其子类。我们关注两个常用的子类,JdbcPartition和HadoopPartition。
此外,RDD源码中有5个方法,代表其组成,如下:


getPartitions 是数据源如何被切分的逻辑,返回值是Partition组成的数组,第一个方法compute是消费切割后的Partition的方法,所以学习Partition,要结合getPartitions和compute方法。

JdbcPartition

在 JdbcRDD.scala 文件中,定义了两个类:JdbcPartition 和 JdbcRDD,和一个对象:JdbcRDD。如图所示:


先看下 class JdbcRDD 的定义:

class JdbcRDD[T: ClassTag](
    sc: SparkContext,
    getConnection: () => Connection,
    sql: String,
    lowerBound: Long,
    upperBound: Long,
    numPartitions: Int,
    mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)
  extends RDD[T](sc, Nil) with Logging {
  ...
}

然后举一个 JdbcRDD 的例子,每一个参数都和上面的参数对应:

val sc = new SparkContext("local[1]", "test") 
val rdd = new JdbcRDD(sc,
    () => { DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb") }, 
    "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?", 
    1, 100, 3, 
    (r: ResultSet) => { r.getInt(1) } ).count()

我们再看下 JdbcPartition 的定义:

private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) extends Partition {
  override def index: Int = idx
}

private[spark],用来限制访问权限,表示只有 org.apache.spark 包下的类可以访问 class JdbcPartition 。

JdbcRDD 类继承了 RDD,并重写了 getPartitions 方法,我们看下getPartitions 的实现,其返回了 JdbcPartition,如图所示:



我们分析下 上图中的 getPartitions,按照如上图所示算法将1到100分为3份(partition数量),结果为(1,33)、(34,66)、(67,100),封装为JdbcPartition并返回,这样数据切分的部分就完成了。

再看下 JdbcRDD 的compute方法:



逻辑清晰,将Partition强转为JdbcPartition,获取连接并预处理sql,将
例子中的”SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?”问号分别用Partition的lower和upper替换(即getPartitions切分好的(1,33)、(34,66)、(67,100))并执行查询。至此,JdbcPartition如何发挥作用就分析完了。

HadoopPartition

举个简单例子:

val sc = new SparkContext("local[1]", "test")
sc.textFile("hdfs://your-file-path").count()

略。。。

决定partition数量的因素

Partition数量可以在初始化RDD时指定(如JdbcPartition例子),不指定的话(如HadoopPartition例子),则
读取spark.default.parallelism配置,不同类型资源管理器取值不同,如下


比较计算得到的goalSize和block大小blockSize,取其中较小者,再和minSize(由属性mapreduce.input.fileinputformat.split.minsize确定,默认值为0,则minSize默认值为1)取较大的。
假设待处理文件大小fSize=512M(视为一个大文件,不考虑1.1系数),block大小bSize=128M,

  1. sc.textFile(…, 3)
    根据上面的公式goalSize=512M/3 > bSize=128M
    取其较小者bSize,则按照bSize切分,split数=512M/128=4,即partition数=4

  2. sc.textFile(…, 5)
    根据上面的公式goalSize=512M/5 < bSize=128M
    取其较小者goalSize,则按照goalSize切分,split数=512M/(512M/5)=5,即partition数=5

可见指定numPartitions,小于block数时无效,大于则生效。

上面分析了决定Partition数量的因数,接下来就该考虑Partition数量的影响以及合适的值。

  • Partition数量的影响

    1. Partition数量太少
      太少的影响显而易见,就是资源不能充分利用,例如local模式下,有16core,但是Partition数量仅为8的话,有一半的core没利用到。
    2. Partition数量太多
      太多,资源利用没什么问题,但是导致task过多,task的序列化和传输的时间开销增大。

    那么多少的partition数是合适的呢,这里我们参考spark doc给出的建议,Typically you want 2-4 partitions for each CPU in your cluster

  • Partition调整

    1. repartition
      reparation是coalesce(numPartitions, shuffle = true),repartition不仅会调整Partition数,也会将Partitioner修改为hashPartitioner,产生shuffle操作。
    2. coalesce
      coalesce函数可以控制是否shuffle,但当shuffle为false时,只能减小Partition数,无法增大。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,589评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,615评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,933评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,976评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,999评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,775评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,474评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,359评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,854评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,007评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,146评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,826评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,484评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,029评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,153评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,420评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,107评论 2 356

推荐阅读更多精彩内容