Spark SQL Limit 介绍及优化

一、概念

1.1、GlobalLimit

case class GlobalLimit(limitExpr: Expression, child: LogicalPlan)

全局限制,最多返回 limitExpr 对应条 records。总是通过 IntegerLiteral#unapply(limitExpr: Expression): Option[Int] 将 limitExpr 转换为 Int。

1.2、LocalLimit

case class LocalLimit(limitExpr: Expression, child: LogicalPlan)

分区级限制(非全局),限制每个物理分区最多返回 limitExpr 对应条 records。同样通过 IntegerLiteral#unapply 得到 limitExpr 对应 Int 值。

当需要限制全局返回至多 n 条数据时,

GlobalLimit n
+- LocalLimit n

如上,通过 LocalLimit n 限制每个分区至多返回 n 条 records,再通过 GlobalLimit n 限制各个分区总体至多返回 n 条 records。

在分布式查询中,将 limit 下推到分区级往往比推到全局级有更好的性能,因为可以减少数据的返回(网络传输),比如对于 GlobalLimit(Union(A, B))

GlobalLimit(Union(LocalLimit(A), LocalLimit(B))) 是比 Union(GlobalLimit(A), GlobalLimit(B)) 更好的下推方式。

二、Optimizer 中 Limit 相关的 Rules

Rule 原则:改变 plan 结构,但不改变结果

2.1、LimitPushDown

下推 LocalLimitUNIONLeft/Right Outer JOIN之下:

  • 对于 Union:若 Union 的任一一边 child 不是一个 limit(GlobalLimit 或 LocalLimit)或是一个 limit 但 limit value 大于 Union parent 的 limit value,以一个 LocalLimit (limit value 为 Union parent limit value)作为 child 的 parent 来作为该边新的 child
  • 对于 Outer Join:对于 Left Outer Join,若 left side 不是一个 limit(GlobalLimit 或 LocalLimit)或是一个 limit 但 limit value 大于 Join parent 的 limit value,以一个 LocalLimit(limit value 为 Join parent limit value) 作为 left side 的 parent 来作为新的 left side;对于 Right Outer Join 同理,只是方向不同

2.1.1、Union: limit to each side

GlobalLimit 1
+- LocalLimit 1
   +- Union
      :- LocalRelation <empty>, [a#0, b#1, c#2]
      +- LocalRelation <empty>, [d#3, e#4, f#5]
      
GlobalLimit 1
+- LocalLimit 1
   +- Union
      :- LocalLimit 1
      :  +- LocalRelation <empty>, [a#0, b#1, c#2]
      +- LocalLimit 1
         +- LocalRelation <empty>, [d#3, e#4, f#5]

2.2.2、Union: limit to each sides if children having larger limit values

GlobalLimit 1
+- LocalLimit 1
   +- Union
      :- LocalRelation <empty>, [a#0, b#1, c#2]
      +- GlobalLimit 3
         +- LocalLimit 3
            +- LocalRelation <empty>, [d#3, e#4, f#5]

GlobalLimit 1
+- LocalLimit 1
   +- Union
      :- LocalLimit 1
      :  +- LocalRelation <empty>, [a#0, b#1, c#2]
      +- LocalLimit 1
         +- LocalLimit 3
            +- LocalRelation <empty>, [d#3, e#4, f#5]

注: Rule CombineLimits 会进一步优化

2.2.3、Left outer join: limit to left side

'GlobalLimit 1
+- 'LocalLimit 1
   +- 'Join LeftOuter
      :- SubqueryAlias x
      :  +- LocalRelation <empty>, [a#0, b#1, c#2]
      +- SubqueryAlias y
         +- LocalRelation <empty>, [a#0, b#1, c#2]

GlobalLimit 1
+- LocalLimit 1
   +- Join LeftOuter
      :- LocalLimit 1
      :  +- LocalRelation <empty>, [a#0, b#1, c#2]
      +- LocalRelation <empty>, [a#6, b#7, c#8]
  • Rule EliminateSubqueryAliases:使用 SubqueryAlias 的 child 替换 SubqueryAlias
  • 同样适用于 right outer join

2.2.4、Right outer join: limit to right side if right side having larger limit value

'GlobalLimit 3
+- 'LocalLimit 3
   +- 'Join RightOuter
      :- SubqueryAlias x
      :  +- LocalRelation <empty>, [a#0, b#1, c#2]
      +- GlobalLimit 5
         +- LocalLimit 5
            +- SubqueryAlias y
               +- LocalRelation <empty>, [a#0, b#1, c#2]

GlobalLimit 3
+- LocalLimit 3
   +- Join RightOuter
      :- LocalRelation <empty>, [a#0, b#1, c#2]
      +- LocalLimit 3
         +- LocalLimit 5
            +- LocalRelation <empty>, [a#6, b#7, c#8]

2.2、CombineLimits

合并两个临近(parnet 和 child)的 limit(GlobalLimit、LocalLimit、Limit) 为一个,limit value 取小的那个

GlobalLimit 1
+- LocalLimit 1
   +- Union
      :- LocalLimit 1
      :  +- LocalRelation <empty>, [a#0, b#1, c#2]
      +- LocalLimit 1
         +- LocalLimit 3
            +- LocalRelation <empty>, [d#3, e#4, f#5]
            
GlobalLimit 1
+- LocalLimit 1
   +- Union
      :- LocalLimit 1
      :  +- LocalRelation <empty>, [a#0, b#1, c#2]
      +- LocalLimit 1
           +- LocalRelation <empty>, [d#3, e#4, f#5]

三、现状的收益与缺陷

3.1、缺陷及改进

3.1.1、limit 未下推到存储层

上述 limit 相关的 rules,并没有把 limit 下推到存储,这样并不会减少最初生成的 RDD 返回的各个分区对应的数据量,在我们的应用场景总中,计算集群和存储集群都是独立部署,在最初的 stage 中的 mapTask 都是通过网络去拉取 parquet 数据,这往往是代价、耗时最高的操作。而实际上,对于很多 limit 场景,并不需要完整的 partition 数据,只需要 n 条

3.1.2、获取结果时的 partitions 扫描策略不合理

limit 操作最终会调用 SparkPlan#executeTake(n: Int) 来获取至多 n 条 records,其内部可能会执行多次 runJob,具体流程如下:

默认情况下每次 runJob 扫描的 partitions 数:

1
4
20
100
500
2500
6875

存在的问题:

  1. 初期扫描的 partitions 数太少,往往需要多个批次才能达到 limit n
  2. 后期每个批次扫描的 partitions 过多,对应的 job耗时较长
  3. 如要扫描多个批次才能达到 limit n,对于下一个批次需要等上一个批次完成才能开始运行,累计的等待时间过长

改进:

  • 以 n 个并发同时扫描多个 partitions,每完成一个 job,立即新增一个 job
  • 这样使得初期扫描的 partitions 数大大增加,由于是并发执行多个 runJob,在相同的时间内能获取到更多的 records 填充到 buf 中
  • 每执行 n 个 jobs 后,每个并发扫描的 partitions 也根据可配置的增长率进行增长,避免要扫描大量 partitions 才能拿到结果需要运行过多的 jobs

3.2、收益

虽然上述 rules 没有将 limit 下推到存储,但也将 limit 下推到相对更底层的 plan,这使得要基于该 plan 做的操作拉取和处理的数据量更小(如 LimitPushdown、CombineLimits 例子中展示)

四、下推 limit 到存储

下推到存储在 plan 层目的是让最开始生成的 RDD 各分区包含尽量少的数据,对于 limit 来说就是要让最开始的 RDD 的各分区至多包含 limit n 条记录。最开始的 RDD 也即读取 parquet 生成的 RDD。

4.1、Parquet RDD 如何生成

parquet on hdfs 是一个没有计算能力的存储方案,目前不支持直接下推 limit 给 parquet,但在一些场景下可以实现让最开始直接读取 parquet 返回的 RDD 的各个 partition 至多返回 limit n 条数据,先来看看这个 RDD 是如何生成的

4.1.1、应用 FileSourceStrategy 来获取 scan

SparkPlanner 应用一系列策略于 Optimized Logical Plan 来生成 Physical Plan,FileSourceStrategy 就是其中的一个策略,主要用于扫描由 sql 指定列、分区的文件集合。其主要流程如下:

名词解释:

Project:投影,要 SELECT 的东西,比如 SELECT a, a+b, udf(c) FROM tb 中的 a, a+b, udf(c) 组合起来为Project 的 projectList: Seq[NamedExpression],和 child 一起组成了 Project:Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode

PhysicalOperation

匹配一个 LogicalPlan 上套了任意个 project 或 filter 操作(连续的)

  • 会将所有 filters 的 conditions 分割(若用 And 连接)组成新的 filters: Seq[Expression]
  • 以最底层 Project 的 fields 作为最终返回的 fields
  • 以最底层 aliases 作为最终返回的 aliases
  def FileSourceStrategy#apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    case PhysicalOperation(projects, filters,
      l @ LogicalRelation(fsRelation: HadoopFsRelation, _, table)) => ...
  }

所以 FileSourceStrategy 策略能应用的 plan 必定是 relation 为 HadoopFsRelation 的 LogicalRelation 上套了连续的任意个 Project 或 Filter。

上图流程中创建了 scan: FileSourceScanExec,该类是一个用于扫描 HadoopFsRelation 的物理执行计划节点。

名词解释:

HadoopFsRelation:包含读取一个基于文件的表所需的所有元数据,如:

  • 经过 partition filters、data filters 过滤的要读取的 partition -> 文件列表 对应关系
  • partition schema
  • data schema
  • file format
  • 表大小(in bytes)
  • 如何分桶(only for bucket table)
  • options

上述流程主要:

  • 将 filters 根据 attributes 是否包含 partition attribute 分为 partitionFilters 和 dataFilter
  • 计算
    • outputAttributes:filters 中包含的 attributes ++ projects
    • outputSchema:非 partition filters 中包含的 attributes ++ projects
  • 使用 partitionKeyFilters、dataFilters、outputAttributes、outputSchema 等构造 FileSourceScanExec。z
  • 若 afterScanFilters 不为空,则需要在 FileSourceScanExec 套一个 FilterExec 座位 parent,即对 FileSourceScanExec 返回的数据需要再做一次 filter(使用 afterScanFilters 包含的各 filters 的 conditions 组合,用 And 连接)
  • 若上一步的 output 与 projects 不一致,还需再套一个 ProjectExec 座位 parent,即再做一次 Project 操作

4.1.2、FileSourceScanExec 生成 RDD

最终会调用lazy 的 inputRDD 成员 来获取 RDD[InternalRow] ,主要包含两个步骤:

4.1.2.1、构造 readFile: (PartitionedFile) => Iterator[InternalRow] 函数变量

通过调用 ParquetFileFormat#buildReaderWithPartitionValues(...): (PartitionedFile) => Iterator[InternalRow] 来获取 readFileFunc

主要就是:

  • 设置将要 scan 的 cols schema
  • 将 catalyst filters 转换为 parquet filter(不是所有的都能转换)
  • 根据是否启用矢量化读取构造 parquetReader
  • 将 cols schema、pushdown data filters 直接或通过配置设置给 parquetReader
  • 使用 reader 构造最终的迭代器,并转化为 Iterator[InternalRow] 类型返回

再看 create rdd 前,我们先来看 FileSourceScanExec#selectedPartitions: Seq[PartitionDirectory] 方法,该方法调用 relation.location.listFiles(partitionFilters, dataFilters) 根据指定的 partition、data filters 过滤不需要扫描的 partitions,只返过滤后的 partitions(一个 PartitionDirectory 对应一个 Seq[FileStatus]

  • 分区表: 各分区及其对应的过滤后的文件列表
  • 非分区表:没有分区值的单个分区及其文件列表
4.1.2.2、使用 readFile 函数变量 create rdd

根据是否是 bucket 表会调用 FileSourceScanExec#createBucketedReadRDDFileSourceScanExec#createNonBucketedReadRDD 来创建 rdd,我们以

FileSourceScanExec#createNonBucketedReadRDD(
      readFile: (PartitionedFile) => Iterator[InternalRow],
      selectedPartitions: Seq[PartitionDirectory],
      fsRelation: HadoopFsRelation): RDD[InternalRow]

为例,其中 readFile 即上一步得到的 func

主要关注:

  • 一个 parquet file 可以切分为多个片段
  • 一个 FileScanRDD 的一个 partition 可能包含多个 partquet file 片段
  • 一个 FileScanRDD 的一个 partition 至多包含 maxSplitSize 大小的数据

FileScanRDD#compute

伪代码如下:

val iterator = new Iterator[Object] with AutoCloseable {
      def hasNext: Boolean = {}
      def next(): Object = {}
      override def close(): Unit = {}
    }
iterator.asInstanceOf[Iterator[InternalRow]] // This is an erasure hack.

其中:

  • readCurrentFile(): Iterator[InternalRow] : 读取 currentFile 转化为 Iterator[InternalRow];
    这个 readFile 就是通过 parquetFileFormat.buildReaderWithPartitionValues 得到的
  • nextIterator(): Boolean:若存在下一个 split,将该 split 转为 iterator 设置为 currentIterator 返回 true;否则返回 false
  • hasNext: Boolean :(currentIterator != null && currentIterator.hasNext) || nextIterator()
  • next: Object : currentIterator.next()

4.2、存储为 parquet 哪些场景可以下推?

只有当对最初生成的 FileScanRDD 各个分区的 iterator 调用 take(n) 不影响其最终结果时才能进行下推,各场景总结如下:

4.2.1、samplest limit

SELECT * FROM ${dbTable} LIMIT 10:能下推(对 partition 对应迭代器做 take(n)),当作为 subquery 或 child 时也支持下推

== Optimized Logical Plan ==
GlobalLimit 10
+- LocalLimit 10
   +- Relation[id#0L,content#1,dt#2] parquet

== Physical Plan ==
CollectLimit 10
+- FileScan parquet xx_jtest_dev.aaa_test_part1[id#0L,content#1,dt#2] Batched: true, Format: Parquet, Location: CatalogFileIndex[, PartitionCount: 0, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,content:string>

4.2.2、limit with filter

SELECT * FROM (SELECT * FROM ${dbTable} LIMIT 10) a WHERE a.content != 'test':能下推

== Optimized Logical Plan ==
Filter (isnotnull(content#1) && NOT (content#1 = test))
+- GlobalLimit 10
   +- LocalLimit 10
      +- Relation[id#0L,content#1,dt#2] parquet
      
== Physical Plan ==
Filter (isnotnull(content#1) && NOT (content#1 = test))
+- GlobalLimit 10
   +- LocalLimit 10
      +- FileScan parquet xx_jtest_dev.aaa_test_part1[id#0L,content#1,dt#2] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://testspark-compile.inc.test.net:9000/testdata/warehouse/testdb.db/suo..., PartitionCount: 0, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,content:string>


SELECT * FROM ${dbTable} WHERE content != 'test' LIMIT 10:仅当 filters 都是 parquet 支持的才能下推 limit;否则不能下推 limit

== Optimized Logical Plan ==
GlobalLimit 10
+- LocalLimit 10
   +- Filter (isnotnull(content#1) && NOT (content#1 = test))
      +- Relation[id#0L,content#1,dt#2] parquet
      
== Physical Plan ==
CollectLimit 10
+- Project [id#0L, content#1, dt#2]
   +- Filter (isnotnull(content#1) && NOT (content#1 = test))
      +- FileScan parquet xx_jtest_dev.aaa_test_part1[id#0L,content#1,dt#2] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://testspark-compile.inc.test.net:9000/testdata/warehouse/testdb.db/suo..., PartitionCount: 0, PartitionFilters: [], PushedFilters: [IsNotNull(content), Not(EqualTo(content,test))], ReadSchema: struct<id:bigint,content:string>

4.2.3、limit with aggregation

SELECT COUNT(*) FROM (SELECT * FROM ${dbTable} LIMIT 10)a:能下推

== Optimized Logical Plan ==
Aggregate [count(1) AS count(1)#4L]
+- GlobalLimit 10
   +- LocalLimit 10
      +- Project
         +- Relation[id#1L,content#2,dt#3] parquet
         
== Physical Plan ==
HashAggregate(keys=[], functions=[count(1)], output=[count(1)#4L])
+- HashAggregate(keys=[], functions=[partial_count(1)], output=[count#11L])
   +- GlobalLimit 10
      +- LocalLimit 10
         +- Project
            +- FileScan parquet xx_jtest_dev.aaa_test_part1[dt#3] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://testspark-compile.inc.test.net:9000/testdata/warehouse/testdb.db/suo..., PartitionCount: 0, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

SELECT * FROM ${dbTable} ORDER BY content LIMIT 10:不能也没必要下推

== Optimized Logical Plan ==
GlobalLimit 10
+- LocalLimit 10
   +- Aggregate [count(1) AS count(1)#4L]
      +- Project
         +- Relation[id#1L,content#2,dt#3] parquet
         
== Physical Plan ==
CollectLimit 10
+- HashAggregate(keys=[], functions=[count(1)], output=[count(1)#4L])
   +- HashAggregate(keys=[], functions=[partial_count(1)], output=[count#11L])
      +- Project
         +- FileScan parquet xx_jtest_dev.aaa_test_part1[dt#3] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://testspark-compile.inc.test.net:9000/testdata/warehouse/testdb.db/suo..., PartitionCount: 0, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

4.2.4、limit with order

SELECT * FROM ${dbTable} ORDER BY content LIMIT 10:不能下推,sort 后再 limit 与 limit 后再 sort 结果不同

== Optimized Logical Plan ==
GlobalLimit 10
+- LocalLimit 10
   +- Sort [content#1 ASC NULLS FIRST], true
      +- Relation[id#0L,content#1,dt#2] parquet
      
== Physical Plan ==
TakeOrderedAndProject(limit=10, orderBy=[content#1 ASC NULLS FIRST], output=[id#0L,content#1,dt#2])
+- FileScan parquet xx_jtest_dev.aaa_test_part1[id#0L,content#1,dt#2] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://testspark-compile.inc.test.net:9000/testdata/warehouse/testdb.db/suo..., PartitionCount: 0, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,content:string>

SELECT * FROM (SELECT * FROM ${dbTable} LIMIT 10)a ORDER BY a.content:能下推

== Optimized Logical Plan ==
Sort [content#1 ASC NULLS FIRST], true
+- GlobalLimit 10
   +- LocalLimit 10
      +- Relation[id#0L,content#1,dt#2] parquet
      
Sort [content#1 ASC NULLS FIRST], true, 0
+- GlobalLimit 10
   +- LocalLimit 10
      +- FileScan parquet xx_jtest_dev.aaa_test_part1[id#0L,content#1,dt#2] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://testspark-compile.inc.test.net:9000/testdata/warehouse/testdb.db/suo..., PartitionCount: 0, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,content:string>

4.2.5、limit with union

参见 LimitPushdown rule,若应用 LimitPushdown rule 能将 limit 下推,则同样也能在同一 side 对应生成 RDD 处对 partition iterator 应用 take(n)

4.2.6、limit with join

参见 LimitPushdown rule,若应用 LimitPushdown rule 能将 limit 下推,则同样也能在同一 side 对应生成 RDD 处对 partition iterator 应用 take(n)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容