Spark 数据读取冷启动优化分析

有时候会发现即使是读取少量的数据,启动延时可能也非常大,针对该现象进行分析,并提供一些解决思路。

背景

Spark 一次查询过程可以简单抽象为 planning 阶段和 execution 阶段,在一个新的 Spark Session 中第一次查询某数据的过程称为冷启动,在这种情况下 planning 的耗时可能会比 execution 更长。

Spark 读取数据冷启动时,会从文件系统中获取文件的一些元数据信息(location,size,etc.)用于优化,如果一个目录下的文件过多,就会比较耗时(可能达到数十分钟),该逻辑在 InMemoryFieIndex 中实现。

后续再次多次查询则会在 FileStatusCache 中进行查询,planning 阶段性能也就大幅提升了,下文将探讨 planning 阶段如何加载元数据以及可能的一些优化点。

InMemoryFileIndex

before spark 2.1

spark 2.1 版本前,spark 直接从文件系统中查询数据的元数据并将其缓存到内存中,元数据包括一个 partition 的列表和文件的一些统计信息(路径,文件大小,是否为目录,备份数,块大小,定义时间,访问时间,数据块位置信息)。一旦数据缓存后,在后续的查询中,表的 partition 就可以在内存中进行下推,得以快速的查询。

将元数据缓存在内存中虽然提供了很好的性能,但也存在2个缺点:在 spark 加载所有表分区的元数据之前,会阻塞查询。对于大型分区表,递归的扫描文件系统以发现初始查询文件的元数据可能会花费数分钟,特别是当数据存储在云端。其次,表的所有元数据都需要放入内存中,增加了内存压力。

after spark 2.1

spark 2.1 针对上述缺点进行了优化,可参考 SPARK-17861

  • 将表分区元数据信息缓存到 catalog 中,例如 (hive metastore),因此可以在 PruneFileSourcePartitions 规则中提前进行分区发现,catalyse optimeizer 会在逻辑计划中对分区进行修剪,避免读取到不需要的分区文件信息。
  • 文件统计现在可以在计划期间内增量的,部分的缓存,而不是全部预先加载。Spark需要知道文件的大小以便在执行物理计划时将它们划分为读取任务。通过共享一个固定大小的250MB缓存(可配置),而不是将所有表文件统计信息缓存到内存中,在减少内存错误风险的情况下显著加快重复查询的速度。

旧表可以使用 MSCK REPAIR TABLE 命令进行转化,查看是否生效,如果 Partition ProviderCatalog 则表示会从 catalog 中获取分区信息

sql("describe formatted test_table")
.filter("col_name like '%Partition Provider%'").show
+-------------------+---------+-------+
|           col_name|data_type|comment|
+-------------------+---------+-------+
|Partition Provider:|  Catalog|       |
+-------------------+---------+-------+

性能对比
出自官方blog,通过读取一张表不同的分区数,观察任务 execution time 和 planning time,在spark2.1之前 planning 阶段的耗时是相同的,意味着读取一个分区也需要扫描全表的 file status。

B1951C19-4FC3-457A-9E41-61B2C547950E.png

优化 HDFS 获取 File 元数据性能

虽然优化了避免加载过多元数据的问题,但是单个分区下文件过多导致读取文件元数据缓慢的问题并没有解决。

SPARK-27801 中(将在 spark3.0 release),对一个目录下多文件的场景进行了优化,性能有大幅度的提升。

使用 DistributedFileSystem.listLocatedStatus 代替了 fs.listStatus + getFileBlockLocations的方式

  • listLocatedStatus 向 namenode 发起一次请求获得 file statusfile block location 信息

  • listStatus 获取一系列的 file status 后,还要根据 file status 循环向 namenode 发起请求获得 file block location信息

listLocatedStatus

// 对 namenode 只发起一次 listLocatedStatus 请求,在方法内部获得每个文件 block location 信息
val statuses = fs.listLocatedStatus(path)
new Iterator[LocatedFileStatus]() {
  def next(): LocatedFileStatus = remoteIter.next
  def hasNext(): Boolean = remoteIter.hasNext
}.toArray
statuses.flatMap{
  Some(f)
}

fs.listStatus + getFileBlockLocations (只展示核心代码)

val statuses = fs.listStatus(path)
statuses.flatMap{
  val locations = fs.getFileBlockLocations(f, 0, f.getLen).map { loc =>
    if (loc.getClass == classOf[BlockLocation]) {
        loc
    } else {
        new BlockLocation(loc.getNames, loc.getHosts, loc.getOffset, loc.getLength)
    }
    }
  val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
            f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
  if (f.isSymlink) {
    lfs.setSymlink(f.getSymlink)
  }
  Some(lfs)
}

性能对比

实测一个57个分区,每个分区1445个文件的任务,性能提升6倍左右

打入 SPARK-27801 前
打入 SPARK-27801 后

文件元数据读取方式及元数据缓存管理

  1. 读取数据时会先判断分区的数量,如果分区数量小于等于spark.sql.sources.parallelPartitionDiscovery.threshold (默认32),则使用 driver 循环读取文件元数据,如果分区数量大于该值,则会启动一个 spark job,分布式的处理元数据信息(每个分区下的文件使用一个task进行处理)
  2. 分区数量很多意味着 Listing leaf files task 的任务会很多,分区里的文件数量多意味着每个 task 的负载高,使用 FileStatusCache 缓存文件状态,默认的缓存 spark.sql.hive.filesourcePartitionFileCacheSize 为 250MB

Tip
Listing leaf files task 的数量计算公式为

val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)

其中,paths.size 为需要读取的分区数量,parallelPartitionDiscoveryParallelism 由参数 spark.sql.sources.parallelPartitionDiscovery.parallelism 控制,默认为10000,目的是防止 task 过多,但从生产任务上观察发现大多数 get status task 完成的时间都是毫秒级,可以考虑把这个值调低,减少任务启动关闭的开销,或者直接修改源码将 paths.size 按一定比例调低,例如 paths.size/2

控制 task 数量之前
控制 task 数量之后

结语

spark 查询冷启动(获取文件元数据性能)对比前几个版本已经有非常大提升,降低了查询的延时

  • SPARK-17861 在物理计划中进行了优化,通过将分区信息存入 catalog ,避免了读取时加载全量表的文件信息

  • SPARK-27801 优化读取 hdfs 文件元数据的方式,之前 getFileBlockLocations 的方式是串行的,在文件数量很多的情况下速度会很慢,同时用 listLocatedStatus 的方式减少了客户端对 namenode 的直接调用,例如需要读取的数据为3个分区,每个分区 10k 个文件,之前客户端需要访问 namenode 的次数为30k,现在为3次

  • 打入最新的 patch 和 优化 task 数量后,随机找的一个生产任务 Listing Leaf files job 时间从数十秒减少到1S以内,不过有时候依旧存在毛刺,这与 namenode 和 机器的负载程度有关

一些思考,是否可以考虑用 Redis 替换 FileStatusCache,在数据写入的时候更新 Redis 中的 file status 信息,这样就相当于所有的 spark 应用共享了 FileStatusCache ,减少了内存使用的同时也不再有读数据冷启动的问题了。

参考

scalable-partition-handling-for-cloud-native-architecture-in-apache-spark-2-1

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,386评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,142评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,704评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,702评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,716评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,573评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,314评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,230评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,680评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,873评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,991评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,706评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,329评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,910评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,038评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,158评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,941评论 2 355