Delta Lake 的 Delete 功能

Delta Lake 的 Delete 功能是由 0.3.0 版本引入的,参见这里,对应的 Patch 参见这里。在介绍 Apache Spark Delta Lake 实现逻辑之前,我们先来看看如何使用 delete 这个功能。

Delta Lake 删除使用

Delta Lake 的官方文档为我们提供如何使用 Delete 的几个例子,参见这里,如下:

importio.delta.tables._val iteblogDeltaTable = DeltaTable.forPath(spark, path)// 删除 id 小于 4 的数据iteblogDeltaTable.delete("id <= '4'")importorg.apache.spark.sql.functions._importspark.implicits._iteblogDeltaTable.delete($"id"<="4")// 删除所有的数据iteblogDeltaTable.delete()

执行上面的 Delete 命令,如果确实删除了相应的数据,Delta Lake 会生成一个事务日志,内容类似下面的:

{"commitInfo":{"timestamp":1566978478414,"operation":"DELETE","operationParameters":{"predicate":"[\"(`id` <= CAST('4' AS BIGINT))\"]"},"readVersion":10,"isBlindAppend":false}}{"remove":{"path":"dt=20190801/part-00000-ca73a0f4-fbeb-4ea8-9b9f-fa466a85724e.c000.snappy.parquet","deletionTimestamp":1566978478405,"dataChange":true}}{"remove":{"path":"dt=20190803/part-00000-8e11f4cc-a7ac-47a1-8ce6-b9d87eaf6c51.c000.snappy.parquet","deletionTimestamp":1566978478405,"dataChange":true}}{"add":{"path":"dt=20190801/part-00001-6ff11be3-22db-4ed2-bde3-a97d610fe11d.c000.snappy.parquet","partitionValues":{"dt":"20190801"},"size":429,"modificationTime":1566978478000,"dataChange":true}}

事务日志里面详细介绍了 Delete 执行的时间、删除的条件、需要删除的文件以及添加的文件等。

注意:执行 Delete 的时候,真实的数据其实并没有删除,只是在事务日志里面记录了,真正删除数据需要通过执行 vacuum 命令。 在编写本文的时候,开源版本的 Delta Lake 不支持使用 SQL 去删除数据,databricks 的企业版是支持的。在未来版本开源版本的 Delta Lake 也是会支持使用 SQL 删除数据的,但具体版本目前还不确定。

1.0.3.0 版本的 Delta Lake 只支持使用 Scala & Java 去删除 Delta Lake 的数据,Python 相关的 API 可能会在 0.4.0 版本发布,参见:https://github.com/delta-io/delta/issues/89

Delta Lake 删除是如何实现的

前面小结我们简单体验了一下 Delete 的使用,本小结将深入代码详细介绍 Delta Lake 的 Delete 是如何实现的。delete 的 API 是通过在

io.delta.tables.DeltaTable 类添加相应方法实现的,其中涉及删除的方法主要包括下面三个:

def delete(condition: String):Unit = {  delete(functions.expr(condition))}def delete(condition: Column):Unit = {  executeDelete(Some(condition.expr))}def delete():Unit = {  executeDelete(None)}

这个就是我们在上面例子看到的 delete 支持的三种用法。这三个函数最终都是调用 io.delta.tables.execution.DeltaTableOperations#executeDelete 函数的,executeDelete 的实现如下:

protecteddef executeDelete(condition:Option[Expression]):Unit= {valdelete =Delete(self.toDF.queryExecution.analyzed, condition)// current DELETE does not support subquery,// and the reason why perform checking here is that// we want to have more meaningful exception messages,// instead of having some random msg generated by executePlan().subqueryNotSupportedCheck(condition,"DELETE")valqe = sparkSession.sessionState.executePlan(delete)valresolvedDelete = qe.analyzed.asInstanceOf[Delete]valdeleteCommand =DeleteCommand(resolvedDelete)  deleteCommand.run(sparkSession)}

self.toDF.queryExecution.analyzed 这个就是我们输入 Delta Lake 表的 Analyzed Logical Plan,condition 就是我们执行删除操作的条件表达式(也就是上面例子的 id < = '4')。这个方法的核心就是初始化 DeleteCommand,然后调用 DeleteCommand 的 run 方法执行删除操作。DeleteCommand 类扩展自 Spark 的 RunnableCommand 特质,并实现其中的 run 方法,我们在 Spark 里面看到的 CREATE TABLE、ALTER TABLE、SHOE CREATE TABLE 等命令都是继承这个类的,所以 Delta Lake 的 delete、update 以及 Merge 也都是继承这个类。DeleteCommand 的 run 方法实现如下:

finaloverridedef run(sparkSession:SparkSession):Seq[Row] = {  recordDeltaOperation(tahoeFileIndex.deltaLog,"delta.dml.delete") {// 获取事务日志持有对象valdeltaLog = tahoeFileIndex.deltaLog// 检查 Delta Lake 表是否支持删除操作deltaLog.assertRemovable()// 开启新事务,执行删除操作。deltaLog.withNewTransaction { txn =>      performDelete(sparkSession, deltaLog, txn)    }// Re-cache all cached plans(including this relation itself, if it's cached) that refer to// this data source relation.sparkSession.sharedState.cacheManager.recacheByPlan(sparkSession, target)  }Seq.empty[Row]}

Delta Lake 表允许用户设置成 appendOnly(通过 spark.databricks.delta.properties.defaults.appendOnly 参数设置),也就是只允许追加操作,所以如果我们执行删除之前需要做一些校验。校验通过之后开始执行删除操作,由于删除操作是需要保证原子性的,所以这个操作需要在事务里面进行,withNewTransaction 的实现如下:

def withNewTransaction[T](thunk:OptimisticTransaction=>T):T= {try{// 更新当前表事务日志的快照update()// 初始化乐观事务锁对象valtxn =newOptimisticTransaction(this)// 开启事务OptimisticTransaction.setActive(txn)// 执行写数据操作thunk(txn)  }finally{// 关闭事务OptimisticTransaction.clearActive()  }}

在开启事务之前,需要更新当前表事务日志的快照,因为在执行删除操作表之前,这张表可能已经被修改了,执行 update 操作之后,就可以拿到当前表的最新版本,紧接着开启乐观事务锁。thunk(txn) 这个就是执行我们上面的 performDelete(sparkSession, deltaLog, txn) 方法。Delta Lake 删除的整个核心就在 performDelete 方法里面了。

如果某个文件里面有数据需要删除,那么这个文件会被标记为删除,然后这个文件里面不需要删除的数据需要重新写到一个新文件里面。那么在 performDelete 方法里面我们就需要知道哪些数据需要删除,这些数据对应的文件在哪里以及是否需要些事务日志。Delta Lake 将删除实现分为三大情况:

1、如果执行 delete 的时候并没有传递相关的删除条件,也就是上面例子的 iteblogDeltaTable.delete(),这时候其实就是删除当前 Delta Lake 表的所有数据。那这种情况最好处理了,只需要直接删除 Delta Lake 表对应的所有文件即可; 2、如果执行 delete 的时候传递了相关删除条件,而这个删除条件只是分区字段,比如 dt 是 Delta Lake 表的分区字段,然后我们执行了 iteblogDeltaTable.delete("dt = '20190828'") 这样相关的删除操作,那么我们可以直接从缓存在内存中的快照(snapshot, 也就是通过上面的 update() 函数初始化的)拿到需要删除哪些文件,直接删除即可,而且不需要执行数据重写操作。 3、最后一种情况就是用户删除的时候含有一些非分区字段的过滤条件,这时候我们就需要扫描底层数据,获取需要删除的数据在哪个文件里面,这又分两种情况: 3.1、Delta Lake 表并不存在我们需要删除的数据,这时候不需要做任何操作,直接返回,就连事务日志都不用记录; 3.2、这种情况是最复杂的,我们需要计算需要删除的数据在哪个文件里面,然后把对应的文件里面不需要删除的数据重写到新的文件里面(如果没有,就不生成新文件),最后记录事务日志。 为了加深印象,我画了一张图希望大家能够理解上面的过程。

如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop 上图中每个绿色的框代表一个分区目录下的文件,红色代表标记为删除的文件,也就是事务日志中使用 remove 标记的文件,紫色代表移除需要删除的数据之后新生成的文件,也就是事务日志里面使用 add 标记的文件。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容