Spark task的输入太大如何分割?

上周调优一个job 的时候发现一个join 意外的耗时间,如图上一个join的shuffle操作就耗时1.2h. Input 才91GB, shuffle write 525.5GB. 但是花了1.2h.

pic1

看看里面的task , Median 就是42min, max 1.2h . 虽然明显是有拖尾的现象,但是Median就42min ,不是skew 造成的。 Median 的输入是203M,Shuffle write 1171MB, Shuffle Spill(Memory) 14.6G, Shuffle Spill(Disk) 2.2GB.看上去是executor.memory 不够才造成的shuffle的spill 耗时间。


pic2

 解决思路:1. 把单个input 变小 2. 调大executor memory . 这次用第一个没有用第二个的原因是这个application 有很多query,如果调大executor memory, 那么其他query 事实上不需要那么大的memory也会跟着一起用较大的executor memory setting.

首先尝试了spark.sql.files.maxPartitionBytes=33554432 , 把单个partition最大读数据量控制在32M。但是执行时间依然是这样,从history server 上看每个task 的输入数据还是200MB ,没有变化。

val FILES_MAX_PARTITION_BYTES = buildConf("spark.sql.files.maxPartitionBytes")

  .doc("The maximum number of bytes to pack into a single partition when reading files.")

  .longConf

  .createWithDefault(128 * 1024 * 1024) // parquet.block.size


重新去看源码,源码:

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L570

每个task接受的input 的确可以通过spark.sql.files.maxPartitionBytes 来控制,但是对于文件格式是有要求的,fsRelation.fileFormat.isSplitable为true才能根据参数分割输入,isSplittable的源码是这样:输入是否能分割和文件格式text,parquet,orc,json没有关系,只和文件格式对应的压缩算法(下面的codec)有关系。

abstract class TextBasedFileFormat extends FileFormat {

  private var codecFactory: CompressionCodecFactory = _

  override def isSplitable(

      sparkSession: SparkSession,

      options: Map[String, String],

      path: Path): Boolean = {

    if (codecFactory == null) {

      codecFactory = new CompressionCodecFactory(

        sparkSession.sessionState.newHadoopConfWithOptions(options))

    }

    val codec = codecFactory.getCodec(path)

    codec == null || codec.isInstanceOf[SplittableCompressionCodec]

  }

}

常见的压缩算法是GZIP,LZO,SNAPPY, GZIP是不能分割的,也就是说一个文件(gz结尾)1GB,怎么设置参数,每个task的输入还是1GB.

看到这里可以写个spark单元测试来验证一下这个spark.sql.files.maxPartitionBytes,先设置成2 , 这个单元测试的用意是看在读数据的时候,spark是否把每个task的输入控制在2byte. 首先在/tmp/seq 下写了一份数据。看了下的确产生了12个partition. 然后读数据的时候看到底用了多少partitions. 看了下最后用了147个分片来读数据。说明这个参数是有效果的。

override def conf: SparkConf = {

    val conf = super.conf

    conf.set("spark.sql.files.maxPartitionBytes","2")

    conf

  }

test("test spark.sql.files.maxPartitionBytes "){

val df1 = spark.range(0, 100).selectExpr("CAST(id AS STRING) AS s")

//original 12partitions

df1.write.mode("overwrite").format("text").text("/tmp/seq")

val df2 = spark.read.format("text")load("/tmp/seq")

println("size:"+getFileScanRDD(df2).filePartitions.size) // 147 partitions

}


重新写个单元测试,写数据的时候用GZIP 压缩下, 看看读数据的时候能否还是用147个partition去读,跑完你就可以发现原来写的时候写了12个partition,现在读文件还是12个partition. 这个时候这个参数根本没有用

override def conf: SparkConf = {

    val conf = super.conf

    conf.set("spark.sql.files.maxPartitionBytes","2")

    conf

  }

test("test spark.sql.files.maxPartitionBytes "){

val df3 = spark.range(0, 100).selectExpr("CAST(id AS STRING) AS s")

    //original 12partitions

df3.write.option("compression", "gzip").mode("overwrite").format("text").text("/tmp/compressed")

val df4 = spark.read.format("text")load("/tmp/compressed")

println("size:"+getFileScanRDD(df4).filePartitions.size) //size:12

}


回头再看最前面的那个问题,原来之所以参数设置了不起效果,就是因为原来的表的格式是SequenceFile且它的压缩格式是GZIP。解决方法重新生成表让它不带压缩的格式或者你选个可以分割的压缩算法就能让后面读数据的时候spark.sql.files.maxPartitionBytes 生效了。来看看效果 Median 34s 即使task的个数从原来的459个变成了3655个,每个task 的输入从原来的200M变成了32M, 减小了6倍,task个数变成了原来的7倍(的确没有压缩的话表的确是在空间上是变大了一点)。 但是每个task的执行时间从原来的42min变成了31s. job 的执行时间肯定是大幅度下降( job 跑得快快的)


pic 3

总结一下整个文章想表达的意思

1.  task 输入太大会对shuffle造成spill 到disk 的额外耗时(当然你的executor memory 如果足够大是不会出现这个问题的),请注意这个case 的输入才200M,也不大,但是对shuffle 也造成要spill到disk 的压力。原因就是输入是使用了压缩算法(GZIP), 200M的输入放到内存可以预计扩大到3~5倍(见下面每个压缩算法的压缩率),也就是600M~1G.

2. 可以通过设置spark.sql.files.maxPartitionBytes 来分割每个task 的输入。 但是配合不同的压缩算法,压缩算法是否可以被分割又决定了输入是否可以被分割。

3. 如何看你的输入是使用了什么分割算法,看分片文件的后缀。下面这个分片是GZIP格式。

part-00000-76a8013e-8a5e-4c7d-8ae6-09368920561b-c000.txt.gz。

4. 总结一下各个压缩算法的压缩率和是否可以被分割,从下图可以看到 GZIP的压缩率的确是最高的,但是GZIP是不可以分割的。


©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,816评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,729评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,300评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,780评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,890评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,084评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,151评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,912评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,355评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,666评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,809评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,504评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,150评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,121评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,628评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,724评论 2 351

推荐阅读更多精彩内容

  • pyspark.sql模块 模块上下文 Spark SQL和DataFrames的重要类: pyspark.sql...
    mpro阅读 9,448评论 0 13
  • 【Spark集群并行度】 在Spark集群环境下,只有足够高的并行度才能使系统资源得到充分的利用,可以通过修改sp...
    遥远的彼岸12阅读 575评论 0 0
  • 前言 继基础篇讲解了每个Spark开发人员都必须熟知的开发调优与资源调优之后,本文作为《Spark性能优化指南》的...
    Alukar阅读 870评论 0 2
  • 电扇静静地坐着这里,还是那个老样子,似乎等着我跟它说话,那我说什么呢?说:“老家伙,你动动吧。”还是说:“老家伙...
    禅茶一味_0aaf阅读 179评论 0 0
  • 网上之前一直流传一句话“deadline是第一生产力”,考试,写论文,申报资料,项目结题,公司任务,个人事务……无...
    闲云001阅读 262评论 0 0