Delta Lake当前版本(0.5)只支持API操作的,但是实现 Insert SQL 语法也不难,可以参考 Delta Lake 平台化实践(离线篇),需要注意的是 Delta Lake 中的分区表覆盖写入操作。
INSERT OVERWRITE
INSERT OVERWRITE TABLE
分区表时分三种场景:
- 动态分区 - 写入前会删除所有分区,并根据数据中的分区字段写入相应新分区
- 静态分区 - 只会对指定的分区进行覆盖写操作
- 混合分区(动态+静态分区) - 上述两种情况的结合
如果想通过 SQL 转化为上述 API ,首先需要在 sql parser 的时候获取到 insertMode 和 partitions 信息,并将 partitions 信息存在一个有序的结构中,例如 LinkedHashMap。然后利用这些信息,就可以拼装进行拼装实现上述三种场景。
动态分区
对所有 ds 分区进行覆盖写操作,将会清空所有 ds 分区
sql
INSERT OVERWRITE TABLE db.tableA partition(ds) select name,ds from db.tableB
Delta Lake API
df.write.format("delta").mode("overwrite").partitionBy(ds)
静态分区
对 ds=20200101 的分区进行覆盖写操作,如果数据中没有分区字段,需要使用 withColumn 增加相应数据。
sql
INSERT OVERWRITE TABLE db.tableA partition(ds=20200101) select name from db.tableB
Delta Lake API
df.write.format("delta").mode("overwrite").option("replaceWhere", "ds = 20200101").partitionBy(ds)
tips: Delta 不能直接将数据写入分区目录,因为所有的 _Delta_Log 都存在表的根目录下。
混合分区
对 ds=20200101 中的所有 event 的分区进行覆盖写操作,将会清空所有 event 分区
sql
INSERT OVERWRITE TABLE db.tableA partition(ds=20200101,event) select name,event from db.tableB
Delta Lake API
df.write.format("delta").mode("overwrite").option("replaceWhere", "ds = 20200101").partitionBy(ds,event)
后记
分区操作,一定要保证 partition 信息的有序
-
新表需要从 hive metastore 中获取 partition 信息,Delta Table 在第一次写入数据前,是不会生成
_DELTA_LOG
目录的,此时可以从 hive metastore 中获取建表时的分区名和其对应的类型,例如://ddl: `ds` INT COMMENT 'ds' val ddl = spark.sharedState.externalCatalog.getTable(dbName, tableName).partitionSchema.toDDL val partitionNameAndType = new mutable.HashMap[String, String]() ddl.split(",").foreach { r => val x = r.split(" ") partitionNameAndType.put(x(0).replace("`", ""), x(1)) }
-
语义不同
Hive Table 直接使用 insert overwrite 动态分区只会覆盖数据涉及到的分区,而 Spark 和 Delta Lake 的 API 则会将所有所有分区进行覆盖。Spark 2.3 以后也可以通过下述API实现 Hive insert overwrite 语义
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic") data.write.mode("overwrite").insertInto("partitioned_table")
-
动态分区覆盖写是高危操作
该操作很有可能会删除一些你不期望的数据,所以 Delta Lake 目前的 API 提供了 replaceWhere option 进行约束