Spark 有类型Dataset API谓词下推问题

本文翻译自 Why is predicate pushdown not used in typed Dataset API (vs untyped DataFrame API)?
译者:耐心的农夫2020

与无类型的DataFrame API相比,为什么有类型的Dataset API没有使用谓词下推?

问题

我一直认为dataset和dataframe API是相同的,两者间的唯一差异是dataset API可以提供编译时安全性。这样的理解对吗?

我有一个非常简单的例子,如下所示。

case class Player (playerID: String, birthYear: Int)

 val playersDs: Dataset[Player] = session.read
  .option("header", "true")
  .option("delimiter", ",")
  .option("inferSchema", "true")
  .csv(PeopleCsv)
  .as[Player]

 // 让我们来找一下出生在1999年的运动员
 // 下面的代码会正常运行, 并且你也会有编译时安全性... 但是它没有使用谓词下推!!!
 playersDs.filter(_.birthYear == 1999).explain()

 // 下面的代码会正常运行,并且使用谓词下推
 // 但是你无法拥有编译时安全性 :(
 playersDs.filter('birthYear === 1999).explain()

第一个例子的explain操作显示它没有做谓词下推(因为PushedFilters是空的)

== Physical Plan ==
*(1) Filter <function1>.apply
+- *(1) FileScan csv [...] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:People.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<playerID:string,birthYear:int,birthMonth:int,birthDay:int,birthCountry:string,birthState:s...

而第二个例子的explain操作显示它使用了谓词下推(看PushedFilters是非空的)

== Physical Plan ==
*(1) Project [.....]
+- *(1) Filter (isnotnull(birthYear#11) && (birthYear#11 = 1999))
   +- *(1) FileScan csv [...] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:People.csv], PartitionFilters: [], PushedFilters: [IsNotNull(birthYear), EqualTo(birthYear,1999)], ReadSchema: struct<playerID:string,birthYear:int,birthMonth:int,birthDay:int,birthCountry:string,birthState:s...

我的问题是,我该如何使用Dataset API才能确保既有编译时安全性,同时还能获得谓词下推?这可能吗?如果不可能,那是否意味着Dataset API在确保编译时安全性的时候牺牲了性能(而DataFrame这种情况下会更快,尤其是处理很大的parquet文件的时候)?

答案

为了理解Dataset[T]DataFrame(即Dataset[Row])之间的区别,你应该记住在你物理计划中的下面这一行。

Filter <function1>.apply

我一直在说人们应该放弃使用有类型的Dataset API而选择使用无类型的DataFrame API,因为在很多情况下,Scala代码对优化器来说是个黑盒子。你刚刚已经碰到了其中一种情况。还可以考虑一下所有对象的反序列化。为了避免GC,SparkSQL尽可能避免把对象存在JVM中。每次你访问对象的时候,你实际上会让SparkSQL反序列化对象,将这些对象加载到JVM中,这对GC会造成很大的压力。(相对无类型的DataFrame API,有类型的Dataset API会更多触发反序列化操作)。

可以参考 UDFs are Blackbox — Don’t Use Them Unless You’ve Got No Choice


引用 Reynold Xin after I asked the very same question on dev@spark.a.o mailing list

UDF是一个黑盒子,Spark无法确切知道它在处理什么。在一些简单情况下,我们可以分析UDF的字节码,并且推断它在做什么。但总体上很难做到。

JIRA上有个ticket描述了这种情况SPARK-14083 Analyze JVM bytecode and turn closures into Catalyst expressions,但是正如某人(我想是twitter上的Adam B)所说,期待Spark在任何时候都能这么做是个笑话。

Dataset API的一大优势是类型安全,但由于严重依赖用户定义的闭包/lamda表达式,牺牲了性能。这些闭包一般会比表达式慢,因为我们有更多的灵活性来优化表达式(因为知道数据类型,没有虚拟函数调用等)。在很多情况下,查看这些闭包的字节码并弄清楚它们在做什么并不困难。如果我们能够理解这些闭包,那么我们就能这些闭包直接转换成Catalyst 表达式,从而获得更加优化的执行速度。


// Let's try to find players born in 1999. 
// This will work, you have compile time safety... but it will not use predicate pushdown!!!
playersDs.filter(_.birthYear == 1999).explain()

上面的代码等价于下面的代码

val someCodeSparkSQLCannotDoMuchOutOfIt = (p: Player) => p.birthYear == 1999
playersDs.filter(someCodeSparkSQLCannotDoMuchOutOfIt).explain()

someCodeSparkSQLCannotDoMuchOutOfIt就是你忽略了优化,让Spark Optimizer跳过它的地方。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容