Delta Lake 是什么?简单的说就是为大数据场景添加了事务功能,并且支持了 update/delete/merge into 等功能, Delta Lake 初探。
本文是在 Delta Lake 0.4 与 Spark 2.4 集成、平台化过程中的一些实践与思考
SQL 支持
DML
背景
delta lake 0.4 只支持以 api 的方式使用 Delete/Update/Merge Into
等 DML,对习惯了使用 sql 的终端用户会增加其学习使用成本。
解决方式
使用 spark sql extension 以插件化的方式扩展 sql parser ,增加 DML 语法的支持。在 spark 推出 sql extension 功能前,也可以用通过 aspectj 通过拦截 sql 的方式实现增加自定义语法的功能。
- 在自定义扩展 g4 文件中相应的 antlr4 DML 语法,部分参考了 databricks 商业版的语法
statement
: DELETE FROM table=qualifiedName tableAlias
(WHERE where=booleanExpression)? #deleteFromTable
| UPDATE table=qualifiedName tableAlias upset=setClause
(WHERE where=booleanExpression)? #updateTable
| VACUUM table=qualifiedName
(RETAIN number HOURS)? (DRY RUN)? #vacuumTable
| (DESC | DESCRIBE) HISTORY table=qualifiedName
(LIMIT limit=INTEGER_VALUE)? #describeDeltaHistory
| MERGE INTO target=qualifiedName targetAlias=tableAlias
USING (source=qualifiedName |
'(' sourceQuery=query')') sourceAlias=tableAlias
ON mergeCondition=booleanExpression
matchedClause*
notMatchedClause* #mergeIntoTable
- 实现对应的 visit,将 sql 翻译为 delta api,以最简单的 delete 为例
override def visitDeleteFromTable(ctx: DeleteFromTableContext): AnyRef = withOrigin(ctx) {
DeleteTableCommand(
visitTableIdentifier(ctx.table),
Option(getText(ctx.where)))
}
case class DeleteTableCommand(table: TableIdentifier,
where: Option[String]) extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
DeltaUtils.deltaTableCheck(sparkSession, table, "DELETE")
val deltaTable = DeltaUtils.getDeltaTable(sparkSession, table)
if (where.isEmpty) {
deltaTable.delete()
} else {
deltaTable.delete(where.get)
}
Seq.empty[Row]
}
}
- 启动 Spark 时加载打包的 extension jar ,初始化 SparkSession 时指定 Extension 类。
val spark = SparkSession.builder
.enableHiveSupport()
.config("spark.sql.extensions", "cn.tongdun.spark.sql.TDExtensions")
tip
spark 3 之前不支持配置多个 extension ,如果遇到使用多个 extension 的情况,可以将多个 extension 在一个 extension 代码中进行注入。
以同时增加 tispark extension 和 自定义 extension 为例
override def apply(extensions: SparkSessionExtensions): Unit = {
extensions.injectParser(TiParser(getOrCreateTiContext))
extensions.injectResolutionRule(TiDDLRule(getOrCreateTiContext))
extensions.injectResolutionRule(TiResolutionRule(getOrCreateTiContext))
extensions.injectPlannerStrategy(TiStrategy(getOrCreateTiContext))
extensions.injectParser { (session, parser) => new TDSparkSqlParser(session, parser)}
}
Query
识别 delta table 有三种实现方式
- 使用相应表名前缀/后缀作为标识
- 在 table properties 中增加相应的参数进行识别
- 判断表是否存在
_delta_log
我们一开始是使用 delta_ 的前缀作为 delta 表名标识,这样实现最为简单,但是如果用户将 hive(parquet) 表转为 hive(delta) ,要是表名发生变化则需要修改相关代码,所以后面改为在table propertie 中增加相应的参数进行识别。
也可以通过判断是否存在 _delta_log 文件识别,该方式需要在建表时写入带有 schema 信息的空数据。
Query 通过对sql执行进行拦截,判断 Statement 为 SELECT 类型,然后将 delta 表的查询翻译成对应的 api 进行查询。
if (statementType == SELECT) {
TableData tableData = (TableData) statementData.getStatement();
sql = DatasourceAdapter.selectAdapter(tableData, sparkSession, sql);
}
Insert
Insert 需要考虑 INSERT_VALUES
/ INSERT_SELECT
还有分区表/非分区表以及写入方式的一些情况。
sql 类型判断
if (INSERT_SELECT == statementType) {
isDeltaTable = DatasourceAdapter.deltaInsertSelectAdapter(sparkSession, statementData);
} else if (INSERT_VALUES == statementType) {
isDeltaTable = DatasourceAdapter.deltaInsertValuesAdapter(sparkSession, statementData);
}
INSERT_INTO 需要从 catalog 中获取对应的 schema 信息,并将 values 转化为 dataFrame
val rows = statementData.getValues.asScala.map(_.asScala.toSeq).map { x => Row(x: _*) }
import spark.implicits._
val schemaStr = spark.catalog.listColumns(dbName, tableName)
.map(col => col.name + " " + col.dataType)
.collect().mkString(",")
val schema = StructType.fromDDL(schemaStr)
val df = spark.createDataFrame(spark.sparkContext.makeRDD[Row](rows), schema)
INSERT_SELECT 则直接访问被解析过的 Delta Query 子句。
partition
由于 delta api 的限制,不支持静态分区,在 table 中解析到对应的动态分区名,使用 partitionBy
写入即可。
至此,已经实现使用 apache spark 2.4 使用 sql 直接操作 delta table 表。
平台化工作
与 hive metastore 的集成,表数据管理 等平台化的一些工作。
浏览 delta 数据
用户在平台上点击浏览数据,如果通过 delta api ,启动 spark job 的方式从 HDFS 读取数据,依赖重,延时高,用户体验差。
基于之前在 parquet 格式上的一些工作,浏览操作可以简化为找出 delta 事务日志中还存活 (add - remove) 的 parquet 文件进行读取,这样就避免了启动 spark 的过程,大多数情况能做到毫秒级返回数据。
需要注意的是,_delta_log 文件只存在父目录,浏览某个分区的数据同样需要浏览父目录获取相应分区内的存活文件。
// DeltaHelper.load 方法会从 _delta_log 目录中找到存活 parquet 文件,然后使用 ParquetFileReader 读取
List<Path> inputFiles;
if (DeltaHelper.isDeltaTable(dir, conf)) {
inputFiles = DeltaHelper.load(dir, conf);
} else {
inputFiles = getInputFilesFromDirectory(projectCode, dir);
}
从 delta 0.5 开始,浏览数据的功能可以通过 manifest 文件进行更简单的实现。参考
Delta Presto Integration & Manifests 机制
元数据兼容
将原生 delta lake 基于 path 的工作方式与 hive metastore 进行兼容。
数据写入/删除
数据动态分区插入 - 统计写入的分区信息(我们是通过修改了 spark write 部分的代码得到的写入分区信息),如果分区不存在则自动增加分区
add partition
。还有一种更简单的做法是直接使用msck repair table
,但是这种方式在分区较多的情况下,性能会非常糟糕。删除分区 - 在界面上操作对某个分区进行删除时,后台调用 delta 删除api,并更新相关 partition 信息。
统计信息更新
元数据中表/分区记录数,大小等元数据的更新支持。
碎片文件整理
非 delta lake 表
小文件整理方式可以参考 Spark 小文件合并优化实践,这种方式采用的是在数据生成后对其进行校验,如果发现碎片文件则进行合并。
小文件整理使用的是同步模式,可能会影响到下游任务的启动时间。
基于 delta lake 的小文件整理要分为两块,存活数据和标记删除的数据
-
标记删除的数据
被 delta 删除的数据,底层 parquet 文件依旧存在,只是在 delta_log 中做了标记,读取时跳过了该文件。
可以使用 delta 自带的 vacuum 功能删除一定时间之前标记删除的数据。
-
存活数据
不断写入的小文件可以基于 delta 的特性,可以实现一个 compaction 功能,然后后台不断的做异步合并,不影响数据的使用方。
结语
一些限制
由于 delta api 的限制,目前 delta delete / update 不支持子句,可以使用 merge into 语法实现相同功能。
merge 使用场景
upsert
有 a1,a2 两张表,如果 a.1eventId = a2.eventId ,则 a2.data 会覆盖 a1.data,否则将 a2 表中相应的数据插入到 a1 表
MERGE INTO bigdata.table1 a1
USING bigdata.table2 a2
ON a1.eventId = a2.eventId
WHEN MATCHED THEN
UPDATE SET a1.data = a2.data
WHEN NOT MATCHED
THEN INSERT (date, eventId, data) VALUES (a2.date, a2.eventId, a2.data)
ETL 避免数据重复场景
如果 uniqueid 只存在于 a2 表,则插入 a2 表中的相应记录
MERGE INTO logs a1
USING updates a2
ON a1.uniqueId = a2.uniqueId
WHEN NOT MATCHED
THEN INSERT *
维度表更新场景
- 如果 a1 和 a2 表的合作方相同,且 a2 中的 deleted 为 true ,则删除 a1 表相应记录
- 如果 a1 和 a2 表的合作方相同,且 a2 中的 deleted 为 false ,则将 a2 表相应记录的 value 更新到 a1 表中
- 如果没有匹配到相应合作方,且 a2 中 deleted 为 fasle ,则将 a2 表相应记录插入到 a1 表
MERGE INTO logs a1
USING updates a2
ON a1.partnerCode = a2.partnerCode
WHEN MATCHED AND a2.deleted = true THEN DELETE
WHEN MATCHED THEN UPDATE SET a1.value = a2.newValue
WHEN NOT MATCHED AND a2.deleted = false THEN INSERT (partnerCode, value) VALUES (partnerCode, newValue)
历史数据清理场景
如果 a1 和 a2 表的合作方相同,则删除 a1 表中 ds < 20190101 的所有数据
MERGE INTO logs a1
USING updates a2
ON a1.partnerCode = a2.partnerCode
WHEN MATCHED AND a1.ds < '20190101' THEN
DELETE