Delta Lake
在说 Delta Lake 之前,要先提一下 Data Lake ,Data Lake 的主要思想是将企业中的所有数据进行统一管理。例如基于 Hadoop 的 Data Lake 方案可以非常低成本的存储所有类型的数据。
基于 hadoop 的方案只支持批量插入,且用户读取时可能无法获取最新数据,多用户同时进行写操作还会发生异常,带来脏数据的问题,并不可靠,并且更新及删除操作非常困难 (需要整块重写),无法保证数据一致性 (例如 spark 读取时缓存了 parquet 元数据,若元数据变化需要进行 refresh)。
Delta Lake 不仅能解决上述问题,还能对数据进行各种增强,例如 time travel 等。
Delta Lake 在近期由 Databricks 开源并发布了其第一个 release 版本,借助了开源社区的力量,底层使用 parquet 文件格式,在其上通过 transaction log
(json 格式) 记录更新操作从而提供了 ACID 事务等能力,通过与 spark 集成,得以处理大量的元数据。
Delta Lake 可以更简单的理解为构建在 Data Lake 上的一层数据库。巧妙的地方在于它不是一种 file format
而是 table format
,这意味着你不需要修改底层的存储文件,它解放了用户花费在数据一致性上的大量时间,进行数据分析时也无需关心表中的数据是来自流数据还是历史数据。此外,通过时间旅行的功能,还可以实现事件回放等功能。
Delta lake 相关特性在官网上已经说的非常详细,不再赘述,根据笔者自己的理解简单描述一下五个比较重要的特性:
- ACID transactions:在数据上的操作变得可靠,因此能提供 Update/Delete/Merge into 的能力。
- Schema enforcement: 提供了指定模式并强制执行它的能力。这有助于确保数据类型正确并且存在必需的列,防止错误数据导致数据损坏。
- Streaming and batch unification:统一了流和批处理,在以前,通过 spark streaming 可以处理当前事件,但是无法处理历史数据或是进行一些复杂的机器学习分析。这个时候大家可能会想到 lambda 架构,但是其实现及运维却异常繁琐。在 Delta Lake 中,不论表是流式追加还是静态,不同的用户同时对它进行读写操作,都是透明的,不会发生异常和并发问题。这意味着你可以直接查询当前最新的数据或是历史的数据。
- Scalable metadata handling:在大数据场景,元数据本身也可能成为大数据(例如一些大分区表的元数据)。如果将大量的元数据操作放在传统的 hive metastore 执行,可能会非常缓慢,因此可以通过 Spark 强大的分布式处理能力来解决这个问题(可以像处理数据一样处理元数据)。
- Time travel:时间旅行,它意味着用户可以提供时间戳对早期版本的数据进行审核、回滚或重现。比如可以应用在机器学习的事件回放场景(不用担心由于数据更新导致多次运行时 input 数据不一致)。
Transactional metadata 实现
在文件上增加一个日志结构化存储(transaction log),该日志有(ordered) 且保持原子性(atomic)。
增加或者删除数据时,都会产生一条描述文件(commit),通过采用乐观并发控制 (optimistic concurrency control) 保证用户并发操作时数据的一致性。
并发控制
Delta Lake 在读写中提供了 ACID 事务保证:
- 即使多个任务同时对一张表进行写入操作,此时也能看到表的一致性快照,同时这些写入操作将会根据时序串行执行。
- 即使在 spark 作业执行期间修改了数据,任务也能读取一致性快照。
乐观并发控制(optimistic concurrency control)
Delta Lake 使用 optimistic concurrency control 机制提供写数据时的事务保证,写过程包含三个步骤:
- Read: 读取表最新可用版本,以确定哪些文件需要被重新定义。
- Write: 将所有的修改写成新的数据文件。
- Validate and commit: 在提交最终变更之前,检查自读取该快照以来,是否和其他并发提交的变更发生冲突。如果没有冲突,所有的变更会被提交并生成一个新版本快照,写操作成功。如果发现了冲突,该写入操作会抛出异常后失败,而不会像开源版的 spark 一样破坏数据。
使用
在 spark 中,为了提高查询性能,在读取表时会缓存其文件的元数据(包含一些路径信息),一旦表进行了更新操作,必须要手动刷新元数据(例如 spark.sql(refresh table xxx)
)才能保证数据读取到最新数据,如果是删除操作,还会触发文件找不到错误。
因为 delta Lake 的特性,所以无需在数据更新后调用 refresh
,目前仅支持 HDFS 作为底层存储
// 将已有 parquet 数据转化为 delta table
val df = spark.read.parquet("/path/to/your/data")
df.write.format("delta").save("/delta/delta_table")
spark.sql("CREATE TABLE delta_table USING DELTA LOCATION '/delta/delta_table'")
delta 表根目录 _delta_log
下会生成 json 格式的事务文件
hdfs dfs -cat /delta/delta_table/_delta_log/00000000000000000000.json
{"commitInfo":{"timestamp":1555989622032,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"}}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"f33f4754-7c0c-43b8-87a2-ddc21e1311ea","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{\"comment\":\"\"}}]}","partitionColumns":[],"configuration":{},"createdTime":1555989621467}}
结语
Delta Lake / Iceberg 使用 table format 的方式能够解决大数据处理的很多痛点。受限于 spark2.4 的 sql 的限制,目前 DML 只支持 api 的方式调用,后续的 sql 支持需要等到 spark3 release 后社区进行支持。